PostgreSQL Source Code  git master
pgoutput.c File Reference
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 
struct  PGOutputTxnData
 

Macros

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 
typedef struct PGOutputTxnData PGOutputTxnData
 

Enumerations

enum  RowFilterPubAction { PUBACTION_INSERT , PUBACTION_UPDATE , PUBACTION_DELETE }
 

Functions

static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
 
static void send_repl_origin (LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void init_rel_sync_cache (MemoryContext cachectx)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Relation relation)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void init_tuple_slot (PGOutputData *data, Relation relation, RelationSyncEntry *entry)
 
static EStatecreate_estate_for_relation (Relation rel)
 
static void pgoutput_row_filter_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static bool pgoutput_row_filter_exec_expr (ExprState *state, ExprContext *econtext)
 
static bool pgoutput_row_filter (Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
 
static void pgoutput_column_list_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void pgoutput_send_begin (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 
static void pgoutput_ensure_entry_cxt (PGOutputData *data, RelationSyncEntry *entry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static HTABRelationSyncCache = NULL
 

Macro Definition Documentation

◆ NUM_ROWFILTER_PUBACTIONS

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)

Definition at line 105 of file pgoutput.c.

Typedef Documentation

◆ PGOutputTxnData

◆ RelationSyncEntry

Enumeration Type Documentation

◆ RowFilterPubAction

Enumerator
PUBACTION_INSERT 
PUBACTION_UPDATE 
PUBACTION_DELETE 

Definition at line 98 of file pgoutput.c.

99 {
103 };
@ PUBACTION_INSERT
Definition: pgoutput.c:100
@ PUBACTION_UPDATE
Definition: pgoutput.c:101
@ PUBACTION_DELETE
Definition: pgoutput.c:102

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 247 of file pgoutput.c.

248 {
255 
262 
263  /* transaction streaming */
271  /* transaction streaming - two-phase commit */
273 }
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1403
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:616
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:418
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1572
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:633
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1685
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:661
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1704
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1807
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1868
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:548
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1840
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:647
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1786
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1754
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1640
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:584
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 2242 of file pgoutput.c.

2243 {
2244  HASH_SEQ_STATUS hash_seq;
2245  RelationSyncEntry *entry;
2246 
2247  Assert(RelationSyncCache != NULL);
2248 
2249  hash_seq_init(&hash_seq, RelationSyncCache);
2250  while ((entry = hash_seq_search(&hash_seq)) != NULL)
2251  {
2252  /*
2253  * We can set the schema_sent flag for an entry that has committed xid
2254  * in the list as that ensures that the subscriber would have the
2255  * corresponding schema and we don't need to send it unless there is
2256  * any invalidation for that relation.
2257  */
2258  foreach_xid(streamed_txn, entry->streamed_txns)
2259  {
2260  if (xid == streamed_txn)
2261  {
2262  if (is_commit)
2263  entry->schema_sent = true;
2264 
2265  entry->streamed_txns =
2266  foreach_delete_current(entry->streamed_txns, streamed_txn);
2267  break;
2268  }
2269  }
2270  }
2271 }
#define Assert(condition)
Definition: c.h:858
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_xid(var, lst)
Definition: pg_list.h:472
static HTAB * RelationSyncCache
Definition: pgoutput.c:210
List * streamed_txns
Definition: pgoutput.c:132

References Assert, foreach_delete_current, foreach_xid, hash_seq_init(), hash_seq_search(), RelationSyncCache, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

◆ create_estate_for_relation()

static EState * create_estate_for_relation ( Relation  rel)
static

Definition at line 794 of file pgoutput.c.

795 {
796  EState *estate;
797  RangeTblEntry *rte;
798  List *perminfos = NIL;
799 
800  estate = CreateExecutorState();
801 
802  rte = makeNode(RangeTblEntry);
803  rte->rtekind = RTE_RELATION;
804  rte->relid = RelationGetRelid(rel);
805  rte->relkind = rel->rd_rel->relkind;
806  rte->rellockmode = AccessShareLock;
807 
808  addRTEPermissionInfo(&perminfos, rte);
809 
810  ExecInitRangeTable(estate, list_make1(rte), perminfos);
811 
812  estate->es_output_cid = GetCurrentCommandId(false);
813 
814  return estate;
815 }
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos)
Definition: execUtils.c:728
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define AccessShareLock
Definition: lockdefs.h:36
#define makeNode(_type_)
Definition: nodes.h:155
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1017
#define NIL
Definition: pg_list.h:68
#define list_make1(x1)
Definition: pg_list.h:212
#define RelationGetRelid(relation)
Definition: rel.h:505
CommandId es_output_cid
Definition: execnodes.h:644
Definition: pg_list.h:54
RTEKind rtekind
Definition: parsenodes.h:1047
Form_pg_class rd_rel
Definition: rel.h:111
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828

References AccessShareLock, addRTEPermissionInfo(), CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), GetCurrentCommandId(), list_make1, makeNode, NIL, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by pgoutput_row_filter_init().

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Relation  relation 
)
static

Definition at line 1973 of file pgoutput.c.

1974 {
1975  RelationSyncEntry *entry;
1976  bool found;
1977  MemoryContext oldctx;
1978  Oid relid = RelationGetRelid(relation);
1979 
1980  Assert(RelationSyncCache != NULL);
1981 
1982  /* Find cached relation info, creating if not found */
1984  &relid,
1985  HASH_ENTER, &found);
1986  Assert(entry != NULL);
1987 
1988  /* initialize entry, if it's new */
1989  if (!found)
1990  {
1991  entry->replicate_valid = false;
1992  entry->schema_sent = false;
1993  entry->streamed_txns = NIL;
1994  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1995  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1996  entry->new_slot = NULL;
1997  entry->old_slot = NULL;
1998  memset(entry->exprstate, 0, sizeof(entry->exprstate));
1999  entry->entry_cxt = NULL;
2000  entry->publish_as_relid = InvalidOid;
2001  entry->columns = NULL;
2002  entry->attrmap = NULL;
2003  }
2004 
2005  /* Validate the entry */
2006  if (!entry->replicate_valid)
2007  {
2008  Oid schemaId = get_rel_namespace(relid);
2009  List *pubids = GetRelationPublications(relid);
2010 
2011  /*
2012  * We don't acquire a lock on the namespace system table as we build
2013  * the cache entry using a historic snapshot and all the later changes
2014  * are absorbed while decoding WAL.
2015  */
2016  List *schemaPubids = GetSchemaPublications(schemaId);
2017  ListCell *lc;
2018  Oid publish_as_relid = relid;
2019  int publish_ancestor_level = 0;
2020  bool am_partition = get_rel_relispartition(relid);
2021  char relkind = get_rel_relkind(relid);
2022  List *rel_publications = NIL;
2023 
2024  /* Reload publications if needed before use. */
2025  if (!publications_valid)
2026  {
2028  if (data->publications)
2029  {
2030  list_free_deep(data->publications);
2031  data->publications = NIL;
2032  }
2033  data->publications = LoadPublications(data->publication_names);
2034  MemoryContextSwitchTo(oldctx);
2035  publications_valid = true;
2036  }
2037 
2038  /*
2039  * Reset schema_sent status as the relation definition may have
2040  * changed. Also reset pubactions to empty in case rel was dropped
2041  * from a publication. Also free any objects that depended on the
2042  * earlier definition.
2043  */
2044  entry->schema_sent = false;
2045  list_free(entry->streamed_txns);
2046  entry->streamed_txns = NIL;
2047  bms_free(entry->columns);
2048  entry->columns = NULL;
2049  entry->pubactions.pubinsert = false;
2050  entry->pubactions.pubupdate = false;
2051  entry->pubactions.pubdelete = false;
2052  entry->pubactions.pubtruncate = false;
2053 
2054  /*
2055  * Tuple slots cleanups. (Will be rebuilt later if needed).
2056  */
2057  if (entry->old_slot)
2059  if (entry->new_slot)
2061 
2062  entry->old_slot = NULL;
2063  entry->new_slot = NULL;
2064 
2065  if (entry->attrmap)
2066  free_attrmap(entry->attrmap);
2067  entry->attrmap = NULL;
2068 
2069  /*
2070  * Row filter cache cleanups.
2071  */
2072  if (entry->entry_cxt)
2074 
2075  entry->entry_cxt = NULL;
2076  entry->estate = NULL;
2077  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2078 
2079  /*
2080  * Build publication cache. We can't use one provided by relcache as
2081  * relcache considers all publications that the given relation is in,
2082  * but here we only need to consider ones that the subscriber
2083  * requested.
2084  */
2085  foreach(lc, data->publications)
2086  {
2087  Publication *pub = lfirst(lc);
2088  bool publish = false;
2089 
2090  /*
2091  * Under what relid should we publish changes in this publication?
2092  * We'll use the top-most relid across all publications. Also
2093  * track the ancestor level for this publication.
2094  */
2095  Oid pub_relid = relid;
2096  int ancestor_level = 0;
2097 
2098  /*
2099  * If this is a FOR ALL TABLES publication, pick the partition
2100  * root and set the ancestor level accordingly.
2101  */
2102  if (pub->alltables)
2103  {
2104  publish = true;
2105  if (pub->pubviaroot && am_partition)
2106  {
2107  List *ancestors = get_partition_ancestors(relid);
2108 
2109  pub_relid = llast_oid(ancestors);
2110  ancestor_level = list_length(ancestors);
2111  }
2112  }
2113 
2114  if (!publish)
2115  {
2116  bool ancestor_published = false;
2117 
2118  /*
2119  * For a partition, check if any of the ancestors are
2120  * published. If so, note down the topmost ancestor that is
2121  * published via this publication, which will be used as the
2122  * relation via which to publish the partition's changes.
2123  */
2124  if (am_partition)
2125  {
2126  Oid ancestor;
2127  int level;
2128  List *ancestors = get_partition_ancestors(relid);
2129 
2130  ancestor = GetTopMostAncestorInPublication(pub->oid,
2131  ancestors,
2132  &level);
2133 
2134  if (ancestor != InvalidOid)
2135  {
2136  ancestor_published = true;
2137  if (pub->pubviaroot)
2138  {
2139  pub_relid = ancestor;
2140  ancestor_level = level;
2141  }
2142  }
2143  }
2144 
2145  if (list_member_oid(pubids, pub->oid) ||
2146  list_member_oid(schemaPubids, pub->oid) ||
2147  ancestor_published)
2148  publish = true;
2149  }
2150 
2151  /*
2152  * If the relation is to be published, determine actions to
2153  * publish, and list of columns, if appropriate.
2154  *
2155  * Don't publish changes for partitioned tables, because
2156  * publishing those of its partitions suffices, unless partition
2157  * changes won't be published due to pubviaroot being set.
2158  */
2159  if (publish &&
2160  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2161  {
2162  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2163  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2164  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2166 
2167  /*
2168  * We want to publish the changes as the top-most ancestor
2169  * across all publications. So we need to check if the already
2170  * calculated level is higher than the new one. If yes, we can
2171  * ignore the new value (as it's a child). Otherwise the new
2172  * value is an ancestor, so we keep it.
2173  */
2174  if (publish_ancestor_level > ancestor_level)
2175  continue;
2176 
2177  /*
2178  * If we found an ancestor higher up in the tree, discard the
2179  * list of publications through which we replicate it, and use
2180  * the new ancestor.
2181  */
2182  if (publish_ancestor_level < ancestor_level)
2183  {
2184  publish_as_relid = pub_relid;
2185  publish_ancestor_level = ancestor_level;
2186 
2187  /* reset the publication list for this relation */
2188  rel_publications = NIL;
2189  }
2190  else
2191  {
2192  /* Same ancestor level, has to be the same OID. */
2193  Assert(publish_as_relid == pub_relid);
2194  }
2195 
2196  /* Track publications for this ancestor. */
2197  rel_publications = lappend(rel_publications, pub);
2198  }
2199  }
2200 
2201  entry->publish_as_relid = publish_as_relid;
2202 
2203  /*
2204  * Initialize the tuple slot, map, and row filter. These are only used
2205  * when publishing inserts, updates, or deletes.
2206  */
2207  if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2208  entry->pubactions.pubdelete)
2209  {
2210  /* Initialize the tuple slot and map */
2211  init_tuple_slot(data, relation, entry);
2212 
2213  /* Initialize the row filter */
2214  pgoutput_row_filter_init(data, rel_publications, entry);
2215 
2216  /* Initialize the column list */
2217  pgoutput_column_list_init(data, rel_publications, entry);
2218  }
2219 
2220  list_free(pubids);
2221  list_free(schemaPubids);
2222  list_free(rel_publications);
2223 
2224  entry->replicate_valid = true;
2225  }
2226 
2227  return entry;
2228 }
void free_attrmap(AttrMap *map)
Definition: attmap.c:56
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1341
@ HASH_ENTER
Definition: hsearch.h:114
List * lappend(List *list, void *datum)
Definition: list.c:339
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2027
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2003
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1952
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
List * get_partition_ancestors(Oid relid)
Definition: partition.c:134
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define llast_oid(l)
Definition: pg_list.h:200
List * GetSchemaPublications(Oid schemaid)
List * GetRelationPublications(Oid relid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1717
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1130
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:869
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1015
static bool publications_valid
Definition: pgoutput.c:82
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
MemoryContextSwitchTo(old_ctx)
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:145
Bitmapset * columns
Definition: pgoutput.c:171
PublicationActions pubactions
Definition: pgoutput.c:136
TupleTableSlot * old_slot
Definition: pgoutput.c:148
bool replicate_valid
Definition: pgoutput.c:129
MemoryContext entry_cxt
Definition: pgoutput.c:177
EState * estate
Definition: pgoutput.c:146
TupleTableSlot * new_slot
Definition: pgoutput.c:147
AttrMap * attrmap
Definition: pgoutput.c:164

References Publication::alltables, Assert, RelationSyncEntry::attrmap, bms_free(), CacheMemoryContext, RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecDropSingleTupleTableSlot(), RelationSyncEntry::exprstate, free_attrmap(), get_partition_ancestors(), get_rel_namespace(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), GetSchemaPublications(), GetTopMostAncestorInPublication(), HASH_ENTER, hash_search(), init_tuple_slot(), InvalidOid, lappend(), lfirst, list_free(), list_free_deep(), list_length(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextDelete(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, NIL, Publication::oid, RelationSyncEntry::old_slot, pgoutput_column_list_init(), pgoutput_row_filter_init(), RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationGetRelid, RelationSyncCache, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1942 of file pgoutput.c.

1943 {
1944  return list_member_xid(entry->streamed_txns, xid);
1945 }
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742

References list_member_xid(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  cachectx)
static

Definition at line 1888 of file pgoutput.c.

1889 {
1890  HASHCTL ctl;
1891  static bool relation_callbacks_registered = false;
1892 
1893  /* Nothing to do if hash table already exists */
1894  if (RelationSyncCache != NULL)
1895  return;
1896 
1897  /* Make a new hash table for the cache */
1898  ctl.keysize = sizeof(Oid);
1899  ctl.entrysize = sizeof(RelationSyncEntry);
1900  ctl.hcxt = cachectx;
1901 
1902  RelationSyncCache = hash_create("logical replication output relation cache",
1903  128, &ctl,
1905 
1906  Assert(RelationSyncCache != NULL);
1907 
1908  /* No more to do if we already registered callbacks */
1909  if (relation_callbacks_registered)
1910  return;
1911 
1912  /* We must update the cache entry for a relation after a relcache flush */
1914 
1915  /*
1916  * Flush all cache entries after a pg_namespace change, in case it was a
1917  * schema rename affecting a relation being replicated.
1918  */
1919  CacheRegisterSyscacheCallback(NAMESPACEOID,
1921  (Datum) 0);
1922 
1923  /*
1924  * Flush all cache entries after any publication changes. (We need no
1925  * callback entry for pg_publication, because publication_invalidation_cb
1926  * will take care of it.)
1927  */
1928  CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1930  (Datum) 0);
1931  CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1933  (Datum) 0);
1934 
1935  relation_callbacks_registered = true;
1936 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1558
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1516
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2328
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2277
uintptr_t Datum
Definition: postgres.h:64
tree ctl
Definition: radixtree.h:1853

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), ctl, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, rel_sync_cache_publication_cb(), rel_sync_cache_relation_cb(), and RelationSyncCache.

Referenced by pgoutput_startup().

◆ init_tuple_slot()

static void init_tuple_slot ( PGOutputData data,
Relation  relation,
RelationSyncEntry entry 
)
static

Definition at line 1130 of file pgoutput.c.

1132 {
1133  MemoryContext oldctx;
1134  TupleDesc oldtupdesc;
1135  TupleDesc newtupdesc;
1136 
1137  oldctx = MemoryContextSwitchTo(data->cachectx);
1138 
1139  /*
1140  * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1141  * live as long as the cache remains.
1142  */
1143  oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1144  newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1145 
1146  entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1147  entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1148 
1149  MemoryContextSwitchTo(oldctx);
1150 
1151  /*
1152  * Cache the map that will be used to convert the relation's tuples into
1153  * the ancestor's format, if needed.
1154  */
1155  if (entry->publish_as_relid != RelationGetRelid(relation))
1156  {
1157  Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1158  TupleDesc indesc = RelationGetDescr(relation);
1159  TupleDesc outdesc = RelationGetDescr(ancestor);
1160 
1161  /* Map must live as long as the session does. */
1163 
1164  entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1165 
1166  MemoryContextSwitchTo(oldctx);
1167  RelationClose(ancestor);
1168  }
1169 }
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:263
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:85
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1325
#define RelationGetDescr(relation)
Definition: rel.h:531
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2061
void RelationClose(Relation relation)
Definition: relcache.c:2192
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition: tupdesc.c:173

References RelationSyncEntry::attrmap, build_attrmap_by_name_if_req(), CacheMemoryContext, CreateTupleDescCopyConstr(), data, MakeSingleTupleTableSlot(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), and TTSOpsHeapTuple.

Referenced by get_rel_sync_entry().

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 1717 of file pgoutput.c.

1718 {
1719  List *result = NIL;
1720  ListCell *lc;
1721 
1722  foreach(lc, pubnames)
1723  {
1724  char *pubname = (char *) lfirst(lc);
1725  Publication *pub = GetPublicationByName(pubname, false);
1726 
1727  result = lappend(result, pub);
1728  }
1729 
1730  return result;
1731 }
Publication * GetPublicationByName(const char *pubname, bool missing_ok)

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 679 of file pgoutput.c.

682 {
684  bool schema_sent;
687 
688  /*
689  * Remember XID of the (sub)transaction for the change. We don't care if
690  * it's top-level transaction or not (we have already sent that XID in
691  * start of the current streaming block).
692  *
693  * If we're not in a streaming block, just use InvalidTransactionId and
694  * the write methods will not include it.
695  */
696  if (data->in_streaming)
697  xid = change->txn->xid;
698 
699  if (rbtxn_is_subtxn(change->txn))
700  topxid = rbtxn_get_toptxn(change->txn)->xid;
701  else
702  topxid = xid;
703 
704  /*
705  * Do we need to send the schema? We do track streamed transactions
706  * separately, because those may be applied later (and the regular
707  * transactions won't see their effects until then) and in an order that
708  * we don't know at this point.
709  *
710  * XXX There is a scope of optimization here. Currently, we always send
711  * the schema first time in a streaming transaction but we can probably
712  * avoid that by checking 'relentry->schema_sent' flag. However, before
713  * doing that we need to study its impact on the case where we have a mix
714  * of streaming and non-streaming transactions.
715  */
716  if (data->in_streaming)
717  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
718  else
719  schema_sent = relentry->schema_sent;
720 
721  /* Nothing to do if we already sent the schema. */
722  if (schema_sent)
723  return;
724 
725  /*
726  * Send the schema. If the changes will be published using an ancestor's
727  * schema, not the relation's own, send that ancestor's schema before
728  * sending relation's own (XXX - maybe sending only the former suffices?).
729  */
730  if (relentry->publish_as_relid != RelationGetRelid(relation))
731  {
732  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
733 
734  send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
735  RelationClose(ancestor);
736  }
737 
738  send_relation_and_attrs(relation, xid, ctx, relentry->columns);
739 
740  if (data->in_streaming)
741  set_schema_sent_in_streamed_txn(relentry, topxid);
742  else
743  relentry->schema_sent = true;
744 }
uint32 TransactionId
Definition: c.h:652
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
Definition: pgoutput.c:750
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1952
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1942
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
void * output_plugin_private
Definition: logical.h:76
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
TransactionId xid
#define InvalidTransactionId
Definition: transam.h:31

References RelationSyncEntry::columns, data, get_schema_sent_in_streamed_txn(), if(), InvalidTransactionId, LogicalDecodingContext::output_plugin_private, RelationSyncEntry::publish_as_relid, rbtxn_get_toptxn, rbtxn_is_subtxn, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 276 of file pgoutput.c.

277 {
278  ListCell *lc;
279  bool protocol_version_given = false;
280  bool publication_names_given = false;
281  bool binary_option_given = false;
282  bool messages_option_given = false;
283  bool streaming_given = false;
284  bool two_phase_option_given = false;
285  bool origin_option_given = false;
286 
287  data->binary = false;
288  data->streaming = LOGICALREP_STREAM_OFF;
289  data->messages = false;
290  data->two_phase = false;
291 
292  foreach(lc, options)
293  {
294  DefElem *defel = (DefElem *) lfirst(lc);
295 
296  Assert(defel->arg == NULL || IsA(defel->arg, String));
297 
298  /* Check each param, whether or not we recognize it */
299  if (strcmp(defel->defname, "proto_version") == 0)
300  {
301  unsigned long parsed;
302  char *endptr;
303 
304  if (protocol_version_given)
305  ereport(ERROR,
306  (errcode(ERRCODE_SYNTAX_ERROR),
307  errmsg("conflicting or redundant options")));
308  protocol_version_given = true;
309 
310  errno = 0;
311  parsed = strtoul(strVal(defel->arg), &endptr, 10);
312  if (errno != 0 || *endptr != '\0')
313  ereport(ERROR,
314  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
315  errmsg("invalid proto_version")));
316 
317  if (parsed > PG_UINT32_MAX)
318  ereport(ERROR,
319  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
320  errmsg("proto_version \"%s\" out of range",
321  strVal(defel->arg))));
322 
323  data->protocol_version = (uint32) parsed;
324  }
325  else if (strcmp(defel->defname, "publication_names") == 0)
326  {
327  if (publication_names_given)
328  ereport(ERROR,
329  (errcode(ERRCODE_SYNTAX_ERROR),
330  errmsg("conflicting or redundant options")));
331  publication_names_given = true;
332 
333  if (!SplitIdentifierString(strVal(defel->arg), ',',
334  &data->publication_names))
335  ereport(ERROR,
336  (errcode(ERRCODE_INVALID_NAME),
337  errmsg("invalid publication_names syntax")));
338  }
339  else if (strcmp(defel->defname, "binary") == 0)
340  {
341  if (binary_option_given)
342  ereport(ERROR,
343  (errcode(ERRCODE_SYNTAX_ERROR),
344  errmsg("conflicting or redundant options")));
345  binary_option_given = true;
346 
347  data->binary = defGetBoolean(defel);
348  }
349  else if (strcmp(defel->defname, "messages") == 0)
350  {
351  if (messages_option_given)
352  ereport(ERROR,
353  (errcode(ERRCODE_SYNTAX_ERROR),
354  errmsg("conflicting or redundant options")));
355  messages_option_given = true;
356 
357  data->messages = defGetBoolean(defel);
358  }
359  else if (strcmp(defel->defname, "streaming") == 0)
360  {
361  if (streaming_given)
362  ereport(ERROR,
363  (errcode(ERRCODE_SYNTAX_ERROR),
364  errmsg("conflicting or redundant options")));
365  streaming_given = true;
366 
367  data->streaming = defGetStreamingMode(defel);
368  }
369  else if (strcmp(defel->defname, "two_phase") == 0)
370  {
371  if (two_phase_option_given)
372  ereport(ERROR,
373  (errcode(ERRCODE_SYNTAX_ERROR),
374  errmsg("conflicting or redundant options")));
375  two_phase_option_given = true;
376 
377  data->two_phase = defGetBoolean(defel);
378  }
379  else if (strcmp(defel->defname, "origin") == 0)
380  {
381  char *origin;
382 
383  if (origin_option_given)
384  ereport(ERROR,
385  errcode(ERRCODE_SYNTAX_ERROR),
386  errmsg("conflicting or redundant options"));
387  origin_option_given = true;
388 
389  origin = defGetString(defel);
390  if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
391  data->publish_no_origin = true;
392  else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
393  data->publish_no_origin = false;
394  else
395  ereport(ERROR,
396  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
397  errmsg("unrecognized origin value: \"%s\"", origin));
398  }
399  else
400  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
401  }
402 
403  /* Check required options */
404  if (!protocol_version_given)
405  ereport(ERROR,
406  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407  errmsg("option \"%s\" missing", "proto_version"));
408  if (!publication_names_given)
409  ereport(ERROR,
410  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
411  errmsg("option \"%s\" missing", "publication_names"));
412 }
unsigned int uint32
Definition: c.h:506
#define PG_UINT32_MAX
Definition: c.h:590
bool defGetBoolean(DefElem *def)
Definition: define.c:107
char * defGetString(DefElem *def)
Definition: define.c:48
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_ORIGIN_ANY
#define LOGICALREP_STREAM_OFF
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
char * defname
Definition: parsenodes.h:817
Node * arg
Definition: parsenodes.h:818
Definition: value.h:64
char defGetStreamingMode(DefElem *def)
#define strVal(v)
Definition: value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3432

References DefElem::arg, Assert, data, defGetBoolean(), defGetStreamingMode(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE, LOGICALREP_STREAM_OFF, pg_strcasecmp(), PG_UINT32_MAX, SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 616 of file pgoutput.c.

617 {
618  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
619 
620  OutputPluginPrepareWrite(ctx, !send_replication_origin);
622 
623  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
624  send_replication_origin);
625 
626  OutputPluginWrite(ctx, true);
627 }
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:722
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:709
#define InvalidRepOriginId
Definition: origin.h:33
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2354
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:127
StringInfo out
Definition: logical.h:71
RepOriginId origin_id
XLogRecPtr origin_lsn

References InvalidRepOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 548 of file pgoutput.c.

549 {
551  sizeof(PGOutputTxnData));
552 
553  txn->output_plugin_private = txndata;
554 }
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
MemoryContext context
Definition: logical.h:36
void * output_plugin_private

References LogicalDecodingContext::context, MemoryContextAllocZero(), and ReorderBufferTXN::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  relation,
ReorderBufferChange change 
)
static

Definition at line 1403 of file pgoutput.c.

1405 {
1408  MemoryContext old;
1409  RelationSyncEntry *relentry;
1411  Relation ancestor = NULL;
1412  Relation targetrel = relation;
1414  TupleTableSlot *old_slot = NULL;
1415  TupleTableSlot *new_slot = NULL;
1416 
1417  if (!is_publishable_relation(relation))
1418  return;
1419 
1420  /*
1421  * Remember the xid for the change in streaming mode. We need to send xid
1422  * with each change in the streaming mode so that subscriber can make
1423  * their association and on aborts, it can discard the corresponding
1424  * changes.
1425  */
1426  if (data->in_streaming)
1427  xid = change->txn->xid;
1428 
1429  relentry = get_rel_sync_entry(data, relation);
1430 
1431  /* First check the table filter */
1432  switch (action)
1433  {
1435  if (!relentry->pubactions.pubinsert)
1436  return;
1437  break;
1439  if (!relentry->pubactions.pubupdate)
1440  return;
1441  break;
1443  if (!relentry->pubactions.pubdelete)
1444  return;
1445 
1446  /*
1447  * This is only possible if deletes are allowed even when replica
1448  * identity is not defined for a table. Since the DELETE action
1449  * can't be published, we simply return.
1450  */
1451  if (!change->data.tp.oldtuple)
1452  {
1453  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1454  return;
1455  }
1456  break;
1457  default:
1458  Assert(false);
1459  }
1460 
1461  /* Avoid leaking memory by using and resetting our own context */
1462  old = MemoryContextSwitchTo(data->context);
1463 
1464  /* Switch relation if publishing via root. */
1465  if (relentry->publish_as_relid != RelationGetRelid(relation))
1466  {
1467  Assert(relation->rd_rel->relispartition);
1468  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1469  targetrel = ancestor;
1470  }
1471 
1472  if (change->data.tp.oldtuple)
1473  {
1474  old_slot = relentry->old_slot;
1475  ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1476 
1477  /* Convert tuple if needed. */
1478  if (relentry->attrmap)
1479  {
1481  &TTSOpsVirtual);
1482 
1483  old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1484  }
1485  }
1486 
1487  if (change->data.tp.newtuple)
1488  {
1489  new_slot = relentry->new_slot;
1490  ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1491 
1492  /* Convert tuple if needed. */
1493  if (relentry->attrmap)
1494  {
1496  &TTSOpsVirtual);
1497 
1498  new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1499  }
1500  }
1501 
1502  /*
1503  * Check row filter.
1504  *
1505  * Updates could be transformed to inserts or deletes based on the results
1506  * of the row filter for old and new tuple.
1507  */
1508  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1509  goto cleanup;
1510 
1511  /*
1512  * Send BEGIN if we haven't yet.
1513  *
1514  * We send the BEGIN message after ensuring that we will actually send the
1515  * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1516  * transactions.
1517  */
1518  if (txndata && !txndata->sent_begin_txn)
1519  pgoutput_send_begin(ctx, txn);
1520 
1521  /*
1522  * Schema should be sent using the original relation because it also sends
1523  * the ancestor's relation.
1524  */
1525  maybe_send_schema(ctx, change, relation, relentry);
1526 
1527  OutputPluginPrepareWrite(ctx, true);
1528 
1529  /* Send the data */
1530  switch (action)
1531  {
1533  logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1534  data->binary, relentry->columns);
1535  break;
1537  logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1538  new_slot, data->binary, relentry->columns);
1539  break;
1541  logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1542  data->binary, relentry->columns);
1543  break;
1544  default:
1545  Assert(false);
1546  }
1547 
1548  OutputPluginWrite(ctx, true);
1549 
1550 cleanup:
1551  if (RelationIsValid(ancestor))
1552  {
1553  RelationClose(ancestor);
1554  ancestor = NULL;
1555  }
1556 
1557  /* Drop the new slots that were used to store the converted tuples. */
1558  if (relentry->attrmap)
1559  {
1560  if (old_slot)
1561  ExecDropSingleTupleTableSlot(old_slot);
1562 
1563  if (new_slot)
1564  ExecDropSingleTupleTableSlot(new_slot);
1565  }
1566 
1567  MemoryContextSwitchTo(old);
1568  MemoryContextReset(data->context);
1569 }
static void cleanup(void)
Definition: bootstrap.c:681
#define DEBUG1
Definition: elog.h:30
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1199
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1439
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
bool is_publishable_relation(Relation rel)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:562
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:1973
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:679
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1222
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:458
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:414
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns)
Definition: proto.c:533
#define RelationIsValid(relation)
Definition: rel.h:478
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@107 data
struct ReorderBufferChange::@107::@108 tp
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192

References generate_unaccent_rules::action, ReorderBufferChange::action, Assert, RelationSyncEntry::attrmap, cleanup(), RelationSyncEntry::columns, ReorderBufferChange::data, data, DEBUG1, elog, ExecDropSingleTupleTableSlot(), ExecStoreHeapTuple(), execute_attr_map_slot(), get_rel_sync_entry(), InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), MakeTupleTableSlot(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_row_filter(), pgoutput_send_begin(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationData::rd_rel, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, TTSOpsVirtual, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_column_list_init()

static void pgoutput_column_list_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 1015 of file pgoutput.c.

1017 {
1018  ListCell *lc;
1019  bool first = true;
1020  Relation relation = RelationIdGetRelation(entry->publish_as_relid);
1021 
1022  /*
1023  * Find if there are any column lists for this relation. If there are,
1024  * build a bitmap using the column lists.
1025  *
1026  * Multiple publications might have multiple column lists for this
1027  * relation.
1028  *
1029  * Note that we don't support the case where the column list is different
1030  * for the same table when combining publications. See comments atop
1031  * fetch_table_list. But one can later change the publication so we still
1032  * need to check all the given publication-table mappings and report an
1033  * error if any publications have a different column list.
1034  *
1035  * FOR ALL TABLES and FOR TABLES IN SCHEMA imply "don't use column list".
1036  */
1037  foreach(lc, publications)
1038  {
1039  Publication *pub = lfirst(lc);
1040  HeapTuple cftuple = NULL;
1041  Datum cfdatum = 0;
1042  Bitmapset *cols = NULL;
1043 
1044  /*
1045  * If the publication is FOR ALL TABLES then it is treated the same as
1046  * if there are no column lists (even if other publications have a
1047  * list).
1048  */
1049  if (!pub->alltables)
1050  {
1051  bool pub_no_list = true;
1052 
1053  /*
1054  * Check for the presence of a column list in this publication.
1055  *
1056  * Note: If we find no pg_publication_rel row, it's a publication
1057  * defined for a whole schema, so it can't have a column list,
1058  * just like a FOR ALL TABLES publication.
1059  */
1060  cftuple = SearchSysCache2(PUBLICATIONRELMAP,
1062  ObjectIdGetDatum(pub->oid));
1063 
1064  if (HeapTupleIsValid(cftuple))
1065  {
1066  /* Lookup the column list attribute. */
1067  cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1068  Anum_pg_publication_rel_prattrs,
1069  &pub_no_list);
1070 
1071  /* Build the column list bitmap in the per-entry context. */
1072  if (!pub_no_list) /* when not null */
1073  {
1074  int i;
1075  int nliveatts = 0;
1076  TupleDesc desc = RelationGetDescr(relation);
1077 
1079 
1080  cols = pub_collist_to_bitmapset(cols, cfdatum,
1081  entry->entry_cxt);
1082 
1083  /* Get the number of live attributes. */
1084  for (i = 0; i < desc->natts; i++)
1085  {
1086  Form_pg_attribute att = TupleDescAttr(desc, i);
1087 
1088  if (att->attisdropped || att->attgenerated)
1089  continue;
1090 
1091  nliveatts++;
1092  }
1093 
1094  /*
1095  * If column list includes all the columns of the table,
1096  * set it to NULL.
1097  */
1098  if (bms_num_members(cols) == nliveatts)
1099  {
1100  bms_free(cols);
1101  cols = NULL;
1102  }
1103  }
1104 
1105  ReleaseSysCache(cftuple);
1106  }
1107  }
1108 
1109  if (first)
1110  {
1111  entry->columns = cols;
1112  first = false;
1113  }
1114  else if (!bms_equal(entry->columns, cols))
1115  ereport(ERROR,
1116  errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1117  errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1119  RelationGetRelationName(relation)));
1120  } /* loop all subscribed publications */
1121 
1122  RelationClose(relation);
1123 }
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
int bms_num_members(const Bitmapset *a)
Definition: bitmapset.c:751
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
int i
Definition: isn.c:73
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:847
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:266
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:479
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:229
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92

References Publication::alltables, bms_equal(), bms_free(), bms_num_members(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, ereport, errcode(), errmsg(), ERROR, get_namespace_name(), HeapTupleIsValid, i, lfirst, TupleDescData::natts, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), pub_collist_to_bitmapset(), RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetNamespace, RelationGetRelationName, RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SysCacheGetAttr(), and TupleDescAttr.

Referenced by get_rel_sync_entry().

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 647 of file pgoutput.c.

649 {
650  OutputPluginUpdateProgress(ctx, false);
651 
652  OutputPluginPrepareWrite(ctx, true);
653  logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
654  OutputPluginWrite(ctx, true);
655 }
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:735
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:248

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 584 of file pgoutput.c.

586 {
588  bool sent_begin_txn;
589 
590  Assert(txndata);
591 
592  /*
593  * We don't need to send the commit message unless some relevant change
594  * from this transaction has been sent to the downstream.
595  */
596  sent_begin_txn = txndata->sent_begin_txn;
597  OutputPluginUpdateProgress(ctx, !sent_begin_txn);
598  pfree(txndata);
599  txn->output_plugin_private = NULL;
600 
601  if (!sent_begin_txn)
602  {
603  elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
604  return;
605  }
606 
607  OutputPluginPrepareWrite(ctx, true);
608  logicalrep_write_commit(ctx->out, txn, commit_lsn);
609  OutputPluginWrite(ctx, true);
610 }
void pfree(void *pointer)
Definition: mcxt.c:1521
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:89
bool sent_begin_txn
Definition: pgoutput.c:206

References Assert, DEBUG1, elog, logicalrep_write_commit(), LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), pfree(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_ensure_entry_cxt()

static void pgoutput_ensure_entry_cxt ( PGOutputData data,
RelationSyncEntry entry 
)
static

Definition at line 847 of file pgoutput.c.

848 {
849  Relation relation;
850 
851  /* The context may already exist, in which case bail out. */
852  if (entry->entry_cxt)
853  return;
854 
855  relation = RelationIdGetRelation(entry->publish_as_relid);
856 
857  entry->entry_cxt = AllocSetContextCreate(data->cachectx,
858  "entry private context",
860 
862  RelationGetRelationName(relation));
863 }
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:170
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:101

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate, data, RelationSyncEntry::entry_cxt, MemoryContextCopyAndSetIdentifier, RelationSyncEntry::publish_as_relid, RelationGetRelationName, and RelationIdGetRelation().

Referenced by pgoutput_column_list_init(), and pgoutput_row_filter_init().

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 1640 of file pgoutput.c.

1643 {
1646 
1647  if (!data->messages)
1648  return;
1649 
1650  /*
1651  * Remember the xid for the message in streaming mode. See
1652  * pgoutput_change.
1653  */
1654  if (data->in_streaming)
1655  xid = txn->xid;
1656 
1657  /*
1658  * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1659  */
1660  if (transactional)
1661  {
1663 
1664  /* Send BEGIN if we haven't yet */
1665  if (txndata && !txndata->sent_begin_txn)
1666  pgoutput_send_begin(ctx, txn);
1667  }
1668 
1669  OutputPluginPrepareWrite(ctx, true);
1671  xid,
1672  message_lsn,
1673  transactional,
1674  prefix,
1675  sz,
1676  message);
1677  OutputPluginWrite(ctx, true);
1678 }
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:643

References data, if(), InvalidTransactionId, logicalrep_write_message(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_send_begin(), PGOutputTxnData::sent_begin_txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 1685 of file pgoutput.c.

1687 {
1689 
1690  if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1691  return true;
1692 
1693  return false;
1694 }

References data, if(), InvalidRepOriginId, and LogicalDecodingContext::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 633 of file pgoutput.c.

635 {
636  OutputPluginUpdateProgress(ctx, false);
637 
638  OutputPluginPrepareWrite(ctx, true);
639  logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
640  OutputPluginWrite(ctx, true);
641 }
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:198

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 661 of file pgoutput.c.

665 {
666  OutputPluginUpdateProgress(ctx, false);
667 
668  OutputPluginPrepareWrite(ctx, true);
669  logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
670  prepare_time);
671  OutputPluginWrite(ctx, true);
672 }
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:304

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_row_filter()

static bool pgoutput_row_filter ( Relation  relation,
TupleTableSlot old_slot,
TupleTableSlot **  new_slot_ptr,
RelationSyncEntry entry,
ReorderBufferChangeType action 
)
static

Definition at line 1222 of file pgoutput.c.

1225 {
1226  TupleDesc desc;
1227  int i;
1228  bool old_matched,
1229  new_matched,
1230  result;
1231  TupleTableSlot *tmp_new_slot;
1232  TupleTableSlot *new_slot = *new_slot_ptr;
1233  ExprContext *ecxt;
1234  ExprState *filter_exprstate;
1235 
1236  /*
1237  * We need this map to avoid relying on ReorderBufferChangeType enums
1238  * having specific values.
1239  */
1240  static const int map_changetype_pubaction[] = {
1244  };
1245 
1249 
1250  Assert(new_slot || old_slot);
1251 
1252  /* Get the corresponding row filter */
1253  filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1254 
1255  /* Bail out if there is no row filter */
1256  if (!filter_exprstate)
1257  return true;
1258 
1259  elog(DEBUG3, "table \"%s.%s\" has row filter",
1261  RelationGetRelationName(relation));
1262 
1264 
1265  ecxt = GetPerTupleExprContext(entry->estate);
1266 
1267  /*
1268  * For the following occasions where there is only one tuple, we can
1269  * evaluate the row filter for that tuple and return.
1270  *
1271  * For inserts, we only have the new tuple.
1272  *
1273  * For updates, we can have only a new tuple when none of the replica
1274  * identity columns changed and none of those columns have external data
1275  * but we still need to evaluate the row filter for the new tuple as the
1276  * existing values of those columns might not match the filter. Also,
1277  * users can use constant expressions in the row filter, so we anyway need
1278  * to evaluate it for the new tuple.
1279  *
1280  * For deletes, we only have the old tuple.
1281  */
1282  if (!new_slot || !old_slot)
1283  {
1284  ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1285  result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1286 
1287  return result;
1288  }
1289 
1290  /*
1291  * Both the old and new tuples must be valid only for updates and need to
1292  * be checked against the row filter.
1293  */
1294  Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1295 
1296  slot_getallattrs(new_slot);
1297  slot_getallattrs(old_slot);
1298 
1299  tmp_new_slot = NULL;
1300  desc = RelationGetDescr(relation);
1301 
1302  /*
1303  * The new tuple might not have all the replica identity columns, in which
1304  * case it needs to be copied over from the old tuple.
1305  */
1306  for (i = 0; i < desc->natts; i++)
1307  {
1308  Form_pg_attribute att = TupleDescAttr(desc, i);
1309 
1310  /*
1311  * if the column in the new tuple or old tuple is null, nothing to do
1312  */
1313  if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1314  continue;
1315 
1316  /*
1317  * Unchanged toasted replica identity columns are only logged in the
1318  * old tuple. Copy this over to the new tuple. The changed (or WAL
1319  * Logged) toast values are always assembled in memory and set as
1320  * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1321  */
1322  if (att->attlen == -1 &&
1323  VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1324  !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1325  {
1326  if (!tmp_new_slot)
1327  {
1328  tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1329  ExecClearTuple(tmp_new_slot);
1330 
1331  memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1332  desc->natts * sizeof(Datum));
1333  memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1334  desc->natts * sizeof(bool));
1335  }
1336 
1337  tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1338  tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1339  }
1340  }
1341 
1342  ecxt->ecxt_scantuple = old_slot;
1343  old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1344 
1345  if (tmp_new_slot)
1346  {
1347  ExecStoreVirtualTuple(tmp_new_slot);
1348  ecxt->ecxt_scantuple = tmp_new_slot;
1349  }
1350  else
1351  ecxt->ecxt_scantuple = new_slot;
1352 
1353  new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1354 
1355  /*
1356  * Case 1: if both tuples don't match the row filter, bailout. Send
1357  * nothing.
1358  */
1359  if (!old_matched && !new_matched)
1360  return false;
1361 
1362  /*
1363  * Case 2: if the old tuple doesn't satisfy the row filter but the new
1364  * tuple does, transform the UPDATE into INSERT.
1365  *
1366  * Use the newly transformed tuple that must contain the column values for
1367  * all the replica identity columns. This is required to ensure that the
1368  * while inserting the tuple in the downstream node, we have all the
1369  * required column values.
1370  */
1371  if (!old_matched && new_matched)
1372  {
1374 
1375  if (tmp_new_slot)
1376  *new_slot_ptr = tmp_new_slot;
1377  }
1378 
1379  /*
1380  * Case 3: if the old tuple satisfies the row filter but the new tuple
1381  * doesn't, transform the UPDATE into DELETE.
1382  *
1383  * This transformation does not require another tuple. The Old tuple will
1384  * be used for DELETE.
1385  */
1386  else if (old_matched && !new_matched)
1388 
1389  /*
1390  * Case 4: if both tuples match the row filter, transformation isn't
1391  * required. (*action is default UPDATE).
1392  */
1393 
1394  return true;
1395 }
#define DEBUG3
Definition: elog.h:28
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1639
#define ResetPerTupleExprContext(estate)
Definition: executor.h:570
#define GetPerTupleExprContext(estate)
Definition: executor.h:561
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:824
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:258
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290

References generate_unaccent_rules::action, Assert, DEBUG3, ExprContext::ecxt_scantuple, elog, RelationSyncEntry::estate, ExecClearTuple(), ExecStoreVirtualTuple(), RelationSyncEntry::exprstate, get_namespace_name(), GetPerTupleExprContext, i, MakeSingleTupleTableSlot(), TupleDescData::natts, pgoutput_row_filter_exec_expr(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ResetPerTupleExprContext, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TTSOpsVirtual, TupleDescAttr, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pgoutput_change().

◆ pgoutput_row_filter_exec_expr()

static bool pgoutput_row_filter_exec_expr ( ExprState state,
ExprContext econtext 
)
static

Definition at line 824 of file pgoutput.c.

825 {
826  Datum ret;
827  bool isnull;
828 
829  Assert(state != NULL);
830 
831  ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
832 
833  elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
834  isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
835  isnull ? "true" : "false");
836 
837  if (isnull)
838  return false;
839 
840  return DatumGetBool(ret);
841 }
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:359
static bool DatumGetBool(Datum X)
Definition: postgres.h:90
Definition: regguts.h:323

References Assert, DatumGetBool(), DEBUG3, elog, and ExecEvalExprSwitchContext().

Referenced by pgoutput_row_filter().

◆ pgoutput_row_filter_init()

static void pgoutput_row_filter_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 869 of file pgoutput.c.

871 {
872  ListCell *lc;
873  List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
874  bool no_filter[] = {false, false, false}; /* One per pubaction */
875  MemoryContext oldctx;
876  int idx;
877  bool has_filter = true;
878  Oid schemaid = get_rel_namespace(entry->publish_as_relid);
879 
880  /*
881  * Find if there are any row filters for this relation. If there are, then
882  * prepare the necessary ExprState and cache it in entry->exprstate. To
883  * build an expression state, we need to ensure the following:
884  *
885  * All the given publication-table mappings must be checked.
886  *
887  * Multiple publications might have multiple row filters for this
888  * relation. Since row filter usage depends on the DML operation, there
889  * are multiple lists (one for each operation) to which row filters will
890  * be appended.
891  *
892  * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
893  * expression" so it takes precedence.
894  */
895  foreach(lc, publications)
896  {
897  Publication *pub = lfirst(lc);
898  HeapTuple rftuple = NULL;
899  Datum rfdatum = 0;
900  bool pub_no_filter = true;
901 
902  /*
903  * If the publication is FOR ALL TABLES, or the publication includes a
904  * FOR TABLES IN SCHEMA where the table belongs to the referred
905  * schema, then it is treated the same as if there are no row filters
906  * (even if other publications have a row filter).
907  */
908  if (!pub->alltables &&
909  !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
910  ObjectIdGetDatum(schemaid),
911  ObjectIdGetDatum(pub->oid)))
912  {
913  /*
914  * Check for the presence of a row filter in this publication.
915  */
916  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
918  ObjectIdGetDatum(pub->oid));
919 
920  if (HeapTupleIsValid(rftuple))
921  {
922  /* Null indicates no filter. */
923  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
924  Anum_pg_publication_rel_prqual,
925  &pub_no_filter);
926  }
927  }
928 
929  if (pub_no_filter)
930  {
931  if (rftuple)
932  ReleaseSysCache(rftuple);
933 
934  no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
935  no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
936  no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
937 
938  /*
939  * Quick exit if all the DML actions are publicized via this
940  * publication.
941  */
942  if (no_filter[PUBACTION_INSERT] &&
943  no_filter[PUBACTION_UPDATE] &&
944  no_filter[PUBACTION_DELETE])
945  {
946  has_filter = false;
947  break;
948  }
949 
950  /* No additional work for this publication. Next one. */
951  continue;
952  }
953 
954  /* Form the per pubaction row filter lists. */
955  if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
956  rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
957  TextDatumGetCString(rfdatum));
958  if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
959  rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
960  TextDatumGetCString(rfdatum));
961  if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
962  rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
963  TextDatumGetCString(rfdatum));
964 
965  ReleaseSysCache(rftuple);
966  } /* loop all subscribed publications */
967 
968  /* Clean the row filter */
969  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
970  {
971  if (no_filter[idx])
972  {
973  list_free_deep(rfnodes[idx]);
974  rfnodes[idx] = NIL;
975  }
976  }
977 
978  if (has_filter)
979  {
981 
983 
984  /*
985  * Now all the filters for all pubactions are known. Combine them when
986  * their pubactions are the same.
987  */
988  oldctx = MemoryContextSwitchTo(entry->entry_cxt);
989  entry->estate = create_estate_for_relation(relation);
990  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
991  {
992  List *filters = NIL;
993  Expr *rfnode;
994 
995  if (rfnodes[idx] == NIL)
996  continue;
997 
998  foreach(lc, rfnodes[idx])
999  filters = lappend(filters, stringToNode((char *) lfirst(lc)));
1000 
1001  /* combine the row filter and cache the ExprState */
1002  rfnode = make_orclause(filters);
1003  entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1004  } /* for each pubaction */
1005  MemoryContextSwitchTo(oldctx);
1006 
1007  RelationClose(relation);
1008  }
1009 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
#define TextDatumGetCString(d)
Definition: builtins.h:98
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:743
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:670
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:105
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:794
void * stringToNode(const char *str)
Definition: read.c:90
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:97

References Publication::alltables, create_estate_for_relation(), data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecPrepareExpr(), RelationSyncEntry::exprstate, get_rel_namespace(), HeapTupleIsValid, idx(), lappend(), lfirst, list_free_deep(), make_orclause(), MemoryContextSwitchTo(), NIL, NUM_ROWFILTER_PUBACTIONS, ObjectIdGetDatum(), Publication::oid, pgoutput_ensure_entry_cxt(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationClose(), RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), SearchSysCacheExists2, stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by get_rel_sync_entry().

◆ pgoutput_send_begin()

static void pgoutput_send_begin ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 562 of file pgoutput.c.

563 {
564  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
566 
567  Assert(txndata);
568  Assert(!txndata->sent_begin_txn);
569 
570  OutputPluginPrepareWrite(ctx, !send_replication_origin);
571  logicalrep_write_begin(ctx->out, txn);
572  txndata->sent_begin_txn = true;
573 
574  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
575  send_replication_origin);
576 
577  OutputPluginWrite(ctx, true);
578 }
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:60

References Assert, InvalidRepOriginId, logicalrep_write_begin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), send_repl_origin(), and PGOutputTxnData::sent_begin_txn.

Referenced by pgoutput_change(), pgoutput_message(), and pgoutput_truncate().

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 1704 of file pgoutput.c.

1705 {
1706  if (RelationSyncCache)
1707  {
1709  RelationSyncCache = NULL;
1710  }
1711 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865

References hash_destroy(), and RelationSyncCache.

Referenced by _PG_output_plugin_init().

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 418 of file pgoutput.c.

420 {
422  static bool publication_callback_registered = false;
423 
424  /* Create our memory context for private allocations. */
425  data->context = AllocSetContextCreate(ctx->context,
426  "logical replication output context",
428 
429  data->cachectx = AllocSetContextCreate(ctx->context,
430  "logical replication cache context",
432 
434 
435  /* This plugin uses binary protocol. */
437 
438  /*
439  * This is replication start and not slot initialization.
440  *
441  * Parse and validate options passed by the client.
442  */
443  if (!is_init)
444  {
445  /* Parse the params and ERROR if we see any we don't recognize */
447 
448  /* Check if we support requested protocol */
449  if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
450  ereport(ERROR,
451  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
452  errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
453  data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
454 
455  if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
456  ereport(ERROR,
457  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458  errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
459  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
460 
461  /*
462  * Decide whether to enable streaming. It is disabled by default, in
463  * which case we just update the flag in decoding context. Otherwise
464  * we only allow it with sufficient version of the protocol, and when
465  * the output plugin supports it.
466  */
467  if (data->streaming == LOGICALREP_STREAM_OFF)
468  ctx->streaming = false;
469  else if (data->streaming == LOGICALREP_STREAM_ON &&
470  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
471  ereport(ERROR,
472  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
473  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
474  data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
475  else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
477  ereport(ERROR,
478  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
479  errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
481  else if (!ctx->streaming)
482  ereport(ERROR,
483  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
484  errmsg("streaming requested, but not supported by output plugin")));
485 
486  /*
487  * Here, we just check whether the two-phase option is passed by
488  * plugin and decide whether to enable it at later point of time. It
489  * remains enabled if the previous start-up has done so. But we only
490  * allow the option to be passed in with sufficient version of the
491  * protocol, and when the output plugin supports it.
492  */
493  if (!data->two_phase)
494  ctx->twophase_opt_given = false;
495  else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
496  ereport(ERROR,
497  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498  errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
499  data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
500  else if (!ctx->twophase)
501  ereport(ERROR,
502  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
503  errmsg("two-phase commit requested, but not supported by output plugin")));
504  else
505  ctx->twophase_opt_given = true;
506 
507  /* Init publication state. */
508  data->publications = NIL;
509  publications_valid = false;
510 
511  /*
512  * Register callback for pg_publication if we didn't already do that
513  * during some previous call in this process.
514  */
515  if (!publication_callback_registered)
516  {
517  CacheRegisterSyscacheCallback(PUBLICATIONOID,
519  (Datum) 0);
520  publication_callback_registered = true;
521  }
522 
523  /* Initialize relation schema cache. */
525  }
526  else
527  {
528  /*
529  * Disable the streaming and prepared transactions during the slot
530  * initialization mode.
531  */
532  ctx->streaming = false;
533  ctx->twophase = false;
534  }
535 }
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
void * palloc0(Size size)
Definition: mcxt.c:1347
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
#define LOGICALREP_STREAM_ON
#define LOGICALREP_STREAM_PARALLEL
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:276
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1888
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1739
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), LogicalDecodingContext::context, data, ereport, errcode(), errmsg(), ERROR, init_rel_sync_cache(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_ON, LOGICALREP_STREAM_PARALLEL, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), publication_invalidation_cb(), publications_valid, LogicalDecodingContext::streaming, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 1807 of file pgoutput.c.

1810 {
1811  ReorderBufferTXN *toptxn;
1813  bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1814 
1815  /*
1816  * The abort should happen outside streaming block, even for streamed
1817  * transactions. The transaction has to be marked as streamed, though.
1818  */
1819  Assert(!data->in_streaming);
1820 
1821  /* determine the toplevel transaction */
1822  toptxn = rbtxn_get_toptxn(txn);
1823 
1824  Assert(rbtxn_is_streamed(toptxn));
1825 
1826  OutputPluginPrepareWrite(ctx, true);
1827  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1828  txn->xact_time.abort_time, write_abort_info);
1829 
1830  OutputPluginWrite(ctx, true);
1831 
1832  cleanup_rel_sync_cache(toptxn->xid, false);
1833 }
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2242
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1166
#define rbtxn_is_streamed(txn)
union ReorderBufferTXN::@113 xact_time
TimestampTz abort_time

References ReorderBufferTXN::abort_time, Assert, cleanup_rel_sync_cache(), data, LOGICALREP_STREAM_PARALLEL, logicalrep_write_stream_abort(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_get_toptxn, rbtxn_is_streamed, ReorderBufferTXN::xact_time, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1840 of file pgoutput.c.

1843 {
1845 
1846  /*
1847  * The commit should happen outside streaming block, even for streamed
1848  * transactions. The transaction has to be marked as streamed, though.
1849  */
1850  Assert(!data->in_streaming);
1851  Assert(rbtxn_is_streamed(txn));
1852 
1853  OutputPluginUpdateProgress(ctx, false);
1854 
1855  OutputPluginPrepareWrite(ctx, true);
1856  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1857  OutputPluginWrite(ctx, true);
1858 
1859  cleanup_rel_sync_cache(txn->xid, true);
1860 }
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:182
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1112

References Assert, cleanup_rel_sync_cache(), data, logicalrep_write_stream_commit(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), PG_USED_FOR_ASSERTS_ONLY, rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1868 of file pgoutput.c.

1871 {
1872  Assert(rbtxn_is_streamed(txn));
1873 
1874  OutputPluginUpdateProgress(ctx, false);
1875  OutputPluginPrepareWrite(ctx, true);
1876  logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1877  OutputPluginWrite(ctx, true);
1878 }
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:364

References Assert, logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), and rbtxn_is_streamed.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1754 of file pgoutput.c.

1756 {
1758  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1759 
1760  /* we can't nest streaming of transactions */
1761  Assert(!data->in_streaming);
1762 
1763  /*
1764  * If we already sent the first stream for this transaction then don't
1765  * send the origin id in the subsequent streams.
1766  */
1767  if (rbtxn_is_streamed(txn))
1768  send_replication_origin = false;
1769 
1770  OutputPluginPrepareWrite(ctx, !send_replication_origin);
1772 
1774  send_replication_origin);
1775 
1776  OutputPluginWrite(ctx, true);
1777 
1778  /* we're streaming a chunk of transaction now */
1779  data->in_streaming = true;
1780 }
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1069
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert, data, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1786 of file pgoutput.c.

1788 {
1790 
1791  /* we should be streaming a transaction */
1792  Assert(data->in_streaming);
1793 
1794  OutputPluginPrepareWrite(ctx, true);
1796  OutputPluginWrite(ctx, true);
1797 
1798  /* we've stopped streaming a transaction */
1799  data->in_streaming = false;
1800 }
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1103

References Assert, data, logicalrep_write_stream_stop(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 1572 of file pgoutput.c.

1574 {
1577  MemoryContext old;
1578  RelationSyncEntry *relentry;
1579  int i;
1580  int nrelids;
1581  Oid *relids;
1583 
1584  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1585  if (data->in_streaming)
1586  xid = change->txn->xid;
1587 
1588  old = MemoryContextSwitchTo(data->context);
1589 
1590  relids = palloc0(nrelations * sizeof(Oid));
1591  nrelids = 0;
1592 
1593  for (i = 0; i < nrelations; i++)
1594  {
1595  Relation relation = relations[i];
1596  Oid relid = RelationGetRelid(relation);
1597 
1598  if (!is_publishable_relation(relation))
1599  continue;
1600 
1601  relentry = get_rel_sync_entry(data, relation);
1602 
1603  if (!relentry->pubactions.pubtruncate)
1604  continue;
1605 
1606  /*
1607  * Don't send partitions if the publication wants to send only the
1608  * root tables through it.
1609  */
1610  if (relation->rd_rel->relispartition &&
1611  relentry->publish_as_relid != relid)
1612  continue;
1613 
1614  relids[nrelids++] = relid;
1615 
1616  /* Send BEGIN if we haven't yet */
1617  if (txndata && !txndata->sent_begin_txn)
1618  pgoutput_send_begin(ctx, txn);
1619 
1620  maybe_send_schema(ctx, change, relation, relentry);
1621  }
1622 
1623  if (nrelids > 0)
1624  {
1625  OutputPluginPrepareWrite(ctx, true);
1627  xid,
1628  nrelids,
1629  relids,
1630  change->data.truncate.cascade,
1631  change->data.truncate.restart_seqs);
1632  OutputPluginWrite(ctx, true);
1633  }
1634 
1635  MemoryContextSwitchTo(old);
1636  MemoryContextReset(data->context);
1637 }
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:586
struct ReorderBufferChange::@107::@109 truncate

References ReorderBufferChange::data, data, get_rel_sync_entry(), i, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), pgoutput_send_begin(), RelationSyncEntry::pubactions, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, RelationData::rd_rel, RelationGetRelid, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1739 of file pgoutput.c.

1740 {
1741  publications_valid = false;
1742 
1743  /*
1744  * Also invalidate per-relation cache so that next time the filtering info
1745  * is checked it will be updated with the new publication settings.
1746  */
1747  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1748 }
void * arg

References arg, publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 2328 of file pgoutput.c.

2329 {
2330  HASH_SEQ_STATUS status;
2331  RelationSyncEntry *entry;
2332 
2333  /*
2334  * We can get here if the plugin was used in SQL interface as the
2335  * RelationSyncCache is destroyed when the decoding finishes, but there is
2336  * no way to unregister the invalidation callbacks.
2337  */
2338  if (RelationSyncCache == NULL)
2339  return;
2340 
2341  /*
2342  * We have no easy way to identify which cache entries this invalidation
2343  * event might have affected, so just mark them all invalid.
2344  */
2345  hash_seq_init(&status, RelationSyncCache);
2346  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2347  {
2348  entry->replicate_valid = false;
2349  }
2350 }

References hash_seq_init(), hash_seq_search(), RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 2277 of file pgoutput.c.

2278 {
2279  RelationSyncEntry *entry;
2280 
2281  /*
2282  * We can get here if the plugin was used in SQL interface as the
2283  * RelationSyncCache is destroyed when the decoding finishes, but there is
2284  * no way to unregister the relcache invalidation callback.
2285  */
2286  if (RelationSyncCache == NULL)
2287  return;
2288 
2289  /*
2290  * Nobody keeps pointers to entries in this hash table around outside
2291  * logical decoding callback calls - but invalidation events can come in
2292  * *during* a callback if we do any syscache access in the callback.
2293  * Because of that we must mark the cache entry as invalid but not damage
2294  * any of its substructure here. The next get_rel_sync_entry() call will
2295  * rebuild it all.
2296  */
2297  if (OidIsValid(relid))
2298  {
2299  /*
2300  * Getting invalidations for relations that aren't in the table is
2301  * entirely normal. So we don't care if it's found or not.
2302  */
2303  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2304  HASH_FIND, NULL);
2305  if (entry != NULL)
2306  entry->replicate_valid = false;
2307  }
2308  else
2309  {
2310  /* Whole cache must be flushed. */
2311  HASH_SEQ_STATUS status;
2312 
2313  hash_seq_init(&status, RelationSyncCache);
2314  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2315  {
2316  entry->replicate_valid = false;
2317  }
2318  }
2319 }
#define OidIsValid(objectId)
Definition: c.h:775
@ HASH_FIND
Definition: hsearch.h:113

References HASH_FIND, hash_search(), hash_seq_init(), hash_seq_search(), OidIsValid, RelationSyncCache, and RelationSyncEntry::replicate_valid.

Referenced by init_rel_sync_cache().

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx,
Bitmapset columns 
)
static

Definition at line 750 of file pgoutput.c.

753 {
754  TupleDesc desc = RelationGetDescr(relation);
755  int i;
756 
757  /*
758  * Write out type info if needed. We do that only for user-created types.
759  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
760  * objects with hand-assigned OIDs to be "built in", not for instance any
761  * function or type defined in the information_schema. This is important
762  * because only hand-assigned OIDs can be expected to remain stable across
763  * major versions.
764  */
765  for (i = 0; i < desc->natts; i++)
766  {
767  Form_pg_attribute att = TupleDescAttr(desc, i);
768 
769  if (att->attisdropped || att->attgenerated)
770  continue;
771 
772  if (att->atttypid < FirstGenbkiObjectId)
773  continue;
774 
775  /* Skip this attribute if it's not present in the column list */
776  if (columns != NULL && !bms_is_member(att->attnum, columns))
777  continue;
778 
779  OutputPluginPrepareWrite(ctx, false);
780  logicalrep_write_typ(ctx->out, xid, att->atttypid);
781  OutputPluginWrite(ctx, false);
782  }
783 
784  OutputPluginPrepareWrite(ctx, false);
785  logicalrep_write_rel(ctx->out, xid, relation, columns);
786  OutputPluginWrite(ctx, false);
787 }
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns)
Definition: proto.c:670
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:725
#define FirstGenbkiObjectId
Definition: transam.h:195

References bms_is_member(), FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 2354 of file pgoutput.c.

2356 {
2357  if (send_origin)
2358  {
2359  char *origin;
2360 
2361  /*----------
2362  * XXX: which behaviour do we want here?
2363  *
2364  * Alternatives:
2365  * - don't send origin message if origin name not found
2366  * (that's what we do now)
2367  * - throw error - that will break replication, not good
2368  * - send some special "unknown" origin
2369  *----------
2370  */
2371  if (replorigin_by_oid(origin_id, true, &origin))
2372  {
2373  /* Message boundary */
2374  OutputPluginWrite(ctx, false);
2375  OutputPluginPrepareWrite(ctx, true);
2376 
2377  logicalrep_write_origin(ctx->out, origin, origin_lsn);
2378  }
2379  }
2380 }
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:469
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:385

References logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_send_begin(), and pgoutput_stream_start().

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1952 of file pgoutput.c.

1953 {
1954  MemoryContext oldctx;
1955 
1957 
1958  entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
1959 
1960  MemoryContextSwitchTo(oldctx);
1961 }
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393

References CacheMemoryContext, lappend_xid(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 38 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 82 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache