PostgreSQL Source Code  git master
pgoutput.c File Reference
#include "postgres.h"
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "commands/defrem.h"
#include "executor/executor.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 
struct  PGOutputTxnData
 

Macros

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)
 
#define CHANGES_THRESHOLD   100
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 
typedef struct PGOutputTxnData PGOutputTxnData
 

Enumerations

enum  RowFilterPubAction { PUBACTION_INSERT , PUBACTION_UPDATE , PUBACTION_DELETE }
 

Functions

void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
 
static void send_repl_origin (LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void update_replication_progress (LogicalDecodingContext *ctx, bool skipped_xact)
 
static void init_rel_sync_cache (MemoryContext decoding_context)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Relation relation)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void init_tuple_slot (PGOutputData *data, Relation relation, RelationSyncEntry *entry)
 
static EStatecreate_estate_for_relation (Relation rel)
 
static void pgoutput_row_filter_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static bool pgoutput_row_filter_exec_expr (ExprState *state, ExprContext *econtext)
 
static bool pgoutput_row_filter (Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
 
static void pgoutput_column_list_init (PGOutputData *data, List *publications, RelationSyncEntry *entry)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void pgoutput_send_begin (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 
static void pgoutput_ensure_entry_cxt (PGOutputData *data, RelationSyncEntry *entry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static bool in_streaming
 
static HTABRelationSyncCache = NULL
 

Macro Definition Documentation

◆ CHANGES_THRESHOLD

#define CHANGES_THRESHOLD   100

◆ NUM_ROWFILTER_PUBACTIONS

#define NUM_ROWFILTER_PUBACTIONS   (PUBACTION_DELETE+1)

Definition at line 108 of file pgoutput.c.

Typedef Documentation

◆ PGOutputTxnData

◆ RelationSyncEntry

Enumeration Type Documentation

◆ RowFilterPubAction

Enumerator
PUBACTION_INSERT 
PUBACTION_UPDATE 
PUBACTION_DELETE 

Definition at line 101 of file pgoutput.c.

102 {
106 };
@ PUBACTION_INSERT
Definition: pgoutput.c:103
@ PUBACTION_UPDATE
Definition: pgoutput.c:104
@ PUBACTION_DELETE
Definition: pgoutput.c:105

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 250 of file pgoutput.c.

251 {
253 
260 
267 
268  /* transaction streaming */
276  /* transaction streaming - two-phase commit */
278 }
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:963
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:581
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:392
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1588
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:1352
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:598
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1704
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:626
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1718
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1818
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1873
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:513
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1847
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:612
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1799
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1768
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:250
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1658
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:549
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 2232 of file pgoutput.c.

2233 {
2234  HASH_SEQ_STATUS hash_seq;
2235  RelationSyncEntry *entry;
2236  ListCell *lc;
2237 
2238  Assert(RelationSyncCache != NULL);
2239 
2240  hash_seq_init(&hash_seq, RelationSyncCache);
2241  while ((entry = hash_seq_search(&hash_seq)) != NULL)
2242  {
2243  /*
2244  * We can set the schema_sent flag for an entry that has committed xid
2245  * in the list as that ensures that the subscriber would have the
2246  * corresponding schema and we don't need to send it unless there is
2247  * any invalidation for that relation.
2248  */
2249  foreach(lc, entry->streamed_txns)
2250  {
2251  if (xid == (uint32) lfirst_int(lc))
2252  {
2253  if (is_commit)
2254  entry->schema_sent = true;
2255 
2256  entry->streamed_txns =
2258  break;
2259  }
2260  }
2261  }
2262 }
unsigned int uint32
Definition: c.h:441
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
Assert(fmt[strlen(fmt) - 1] !='\n')
#define lfirst_int(lc)
Definition: pg_list.h:170
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:369
static HTAB * RelationSyncCache
Definition: pgoutput.c:213
List * streamed_txns
Definition: pgoutput.c:135

References Assert(), foreach_delete_current, hash_seq_init(), hash_seq_search(), lfirst_int, RelationSyncCache, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

◆ create_estate_for_relation()

static EState * create_estate_for_relation ( Relation  rel)
static

Definition at line 758 of file pgoutput.c.

759 {
760  EState *estate;
761  RangeTblEntry *rte;
762 
763  estate = CreateExecutorState();
764 
765  rte = makeNode(RangeTblEntry);
766  rte->rtekind = RTE_RELATION;
767  rte->relid = RelationGetRelid(rel);
768  rte->relkind = rel->rd_rel->relkind;
770  ExecInitRangeTable(estate, list_make1(rte));
771 
772  estate->es_output_cid = GetCurrentCommandId(false);
773 
774  return estate;
775 }
void ExecInitRangeTable(EState *estate, List *rangeTable)
Definition: execUtils.c:751
EState * CreateExecutorState(void)
Definition: execUtils.c:90
#define AccessShareLock
Definition: lockdefs.h:36
#define makeNode(_type_)
Definition: nodes.h:621
@ RTE_RELATION
Definition: parsenodes.h:998
#define list_make1(x1)
Definition: pg_list.h:206
#define RelationGetRelid(relation)
Definition: rel.h:489
CommandId es_output_cid
Definition: execnodes.h:604
RTEKind rtekind
Definition: parsenodes.h:1015
Form_pg_class rd_rel
Definition: rel.h:109
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:814

References AccessShareLock, CreateExecutorState(), EState::es_output_cid, ExecInitRangeTable(), GetCurrentCommandId(), list_make1, makeNode, RelationData::rd_rel, RelationGetRelid, RangeTblEntry::relid, RangeTblEntry::relkind, RangeTblEntry::rellockmode, RTE_RELATION, and RangeTblEntry::rtekind.

Referenced by pgoutput_row_filter_init().

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Relation  relation 
)
static

Definition at line 1963 of file pgoutput.c.

1964 {
1965  RelationSyncEntry *entry;
1966  bool found;
1967  MemoryContext oldctx;
1968  Oid relid = RelationGetRelid(relation);
1969 
1970  Assert(RelationSyncCache != NULL);
1971 
1972  /* Find cached relation info, creating if not found */
1974  (void *) &relid,
1975  HASH_ENTER, &found);
1976  Assert(entry != NULL);
1977 
1978  /* initialize entry, if it's new */
1979  if (!found)
1980  {
1981  entry->replicate_valid = false;
1982  entry->schema_sent = false;
1983  entry->streamed_txns = NIL;
1984  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1985  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1986  entry->new_slot = NULL;
1987  entry->old_slot = NULL;
1988  memset(entry->exprstate, 0, sizeof(entry->exprstate));
1989  entry->entry_cxt = NULL;
1990  entry->publish_as_relid = InvalidOid;
1991  entry->columns = NULL;
1992  entry->attrmap = NULL;
1993  }
1994 
1995  /* Validate the entry */
1996  if (!entry->replicate_valid)
1997  {
1998  Oid schemaId = get_rel_namespace(relid);
1999  List *pubids = GetRelationPublications(relid);
2000 
2001  /*
2002  * We don't acquire a lock on the namespace system table as we build
2003  * the cache entry using a historic snapshot and all the later changes
2004  * are absorbed while decoding WAL.
2005  */
2006  List *schemaPubids = GetSchemaPublications(schemaId);
2007  ListCell *lc;
2008  Oid publish_as_relid = relid;
2009  int publish_ancestor_level = 0;
2010  bool am_partition = get_rel_relispartition(relid);
2011  char relkind = get_rel_relkind(relid);
2012  List *rel_publications = NIL;
2013 
2014  /* Reload publications if needed before use. */
2015  if (!publications_valid)
2016  {
2018  if (data->publications)
2019  {
2020  list_free_deep(data->publications);
2021  data->publications = NIL;
2022  }
2023  data->publications = LoadPublications(data->publication_names);
2024  MemoryContextSwitchTo(oldctx);
2025  publications_valid = true;
2026  }
2027 
2028  /*
2029  * Reset schema_sent status as the relation definition may have
2030  * changed. Also reset pubactions to empty in case rel was dropped
2031  * from a publication. Also free any objects that depended on the
2032  * earlier definition.
2033  */
2034  entry->schema_sent = false;
2035  list_free(entry->streamed_txns);
2036  entry->streamed_txns = NIL;
2037  bms_free(entry->columns);
2038  entry->columns = NULL;
2039  entry->pubactions.pubinsert = false;
2040  entry->pubactions.pubupdate = false;
2041  entry->pubactions.pubdelete = false;
2042  entry->pubactions.pubtruncate = false;
2043 
2044  /*
2045  * Tuple slots cleanups. (Will be rebuilt later if needed).
2046  */
2047  if (entry->old_slot)
2049  if (entry->new_slot)
2051 
2052  entry->old_slot = NULL;
2053  entry->new_slot = NULL;
2054 
2055  if (entry->attrmap)
2056  free_attrmap(entry->attrmap);
2057  entry->attrmap = NULL;
2058 
2059  /*
2060  * Row filter cache cleanups.
2061  */
2062  if (entry->entry_cxt)
2064 
2065  entry->entry_cxt = NULL;
2066  entry->estate = NULL;
2067  memset(entry->exprstate, 0, sizeof(entry->exprstate));
2068 
2069  /*
2070  * Build publication cache. We can't use one provided by relcache as
2071  * relcache considers all publications that the given relation is in,
2072  * but here we only need to consider ones that the subscriber
2073  * requested.
2074  */
2075  foreach(lc, data->publications)
2076  {
2077  Publication *pub = lfirst(lc);
2078  bool publish = false;
2079 
2080  /*
2081  * Under what relid should we publish changes in this publication?
2082  * We'll use the top-most relid across all publications. Also
2083  * track the ancestor level for this publication.
2084  */
2085  Oid pub_relid = relid;
2086  int ancestor_level = 0;
2087 
2088  /*
2089  * If this is a FOR ALL TABLES publication, pick the partition
2090  * root and set the ancestor level accordingly.
2091  */
2092  if (pub->alltables)
2093  {
2094  publish = true;
2095  if (pub->pubviaroot && am_partition)
2096  {
2097  List *ancestors = get_partition_ancestors(relid);
2098 
2099  pub_relid = llast_oid(ancestors);
2100  ancestor_level = list_length(ancestors);
2101  }
2102  }
2103 
2104  if (!publish)
2105  {
2106  bool ancestor_published = false;
2107 
2108  /*
2109  * For a partition, check if any of the ancestors are
2110  * published. If so, note down the topmost ancestor that is
2111  * published via this publication, which will be used as the
2112  * relation via which to publish the partition's changes.
2113  */
2114  if (am_partition)
2115  {
2116  Oid ancestor;
2117  int level;
2118  List *ancestors = get_partition_ancestors(relid);
2119 
2120  ancestor = GetTopMostAncestorInPublication(pub->oid,
2121  ancestors,
2122  &level);
2123 
2124  if (ancestor != InvalidOid)
2125  {
2126  ancestor_published = true;
2127  if (pub->pubviaroot)
2128  {
2129  pub_relid = ancestor;
2130  ancestor_level = level;
2131  }
2132  }
2133  }
2134 
2135  if (list_member_oid(pubids, pub->oid) ||
2136  list_member_oid(schemaPubids, pub->oid) ||
2137  ancestor_published)
2138  publish = true;
2139  }
2140 
2141  /*
2142  * If the relation is to be published, determine actions to
2143  * publish, and list of columns, if appropriate.
2144  *
2145  * Don't publish changes for partitioned tables, because
2146  * publishing those of its partitions suffices, unless partition
2147  * changes won't be published due to pubviaroot being set.
2148  */
2149  if (publish &&
2150  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2151  {
2152  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
2153  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
2154  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
2156 
2157  /*
2158  * We want to publish the changes as the top-most ancestor
2159  * across all publications. So we need to check if the already
2160  * calculated level is higher than the new one. If yes, we can
2161  * ignore the new value (as it's a child). Otherwise the new
2162  * value is an ancestor, so we keep it.
2163  */
2164  if (publish_ancestor_level > ancestor_level)
2165  continue;
2166 
2167  /*
2168  * If we found an ancestor higher up in the tree, discard the
2169  * list of publications through which we replicate it, and use
2170  * the new ancestor.
2171  */
2172  if (publish_ancestor_level < ancestor_level)
2173  {
2174  publish_as_relid = pub_relid;
2175  publish_ancestor_level = ancestor_level;
2176 
2177  /* reset the publication list for this relation */
2178  rel_publications = NIL;
2179  }
2180  else
2181  {
2182  /* Same ancestor level, has to be the same OID. */
2183  Assert(publish_as_relid == pub_relid);
2184  }
2185 
2186  /* Track publications for this ancestor. */
2187  rel_publications = lappend(rel_publications, pub);
2188  }
2189  }
2190 
2191  entry->publish_as_relid = publish_as_relid;
2192 
2193  /*
2194  * Initialize the tuple slot, map, and row filter. These are only used
2195  * when publishing inserts, updates, or deletes.
2196  */
2197  if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2198  entry->pubactions.pubdelete)
2199  {
2200  /* Initialize the tuple slot and map */
2201  init_tuple_slot(data, relation, entry);
2202 
2203  /* Initialize the row filter */
2204  pgoutput_row_filter_init(data, rel_publications, entry);
2205 
2206  /* Initialize the column list */
2207  pgoutput_column_list_init(data, rel_publications, entry);
2208  }
2209 
2210  list_free(pubids);
2211  list_free(schemaPubids);
2212  list_free(rel_publications);
2213 
2214  entry->replicate_valid = true;
2215  }
2216 
2217  return entry;
2218 }
void free_attrmap(AttrMap *map)
Definition: attmap.c:57
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1254
@ HASH_ENTER
Definition: hsearch.h:114
List * lappend(List *list, void *datum)
Definition: list.c:336
void list_free(List *list)
Definition: list.c:1505
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:701
void list_free_deep(List *list)
Definition: list.c:1519
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2008
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1984
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1933
MemoryContext CacheMemoryContext
Definition: mcxt.c:51
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:218
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
List * get_partition_ancestors(Oid relid)
Definition: partition.c:133
const void * data
#define lfirst(lc)
Definition: pg_list.h:169
static int list_length(const List *l)
Definition: pg_list.h:149
#define NIL
Definition: pg_list.h:65
#define llast_oid(l)
Definition: pg_list.h:196
List * GetSchemaPublications(Oid schemaid)
List * GetRelationPublications(Oid relid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1731
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1079
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:829
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:978
static bool publications_valid
Definition: pgoutput.c:82
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
Definition: pg_list.h:51
PublicationActions pubactions
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:148
Bitmapset * columns
Definition: pgoutput.c:174
PublicationActions pubactions
Definition: pgoutput.c:139
TupleTableSlot * old_slot
Definition: pgoutput.c:151
bool replicate_valid
Definition: pgoutput.c:132
MemoryContext entry_cxt
Definition: pgoutput.c:180
EState * estate
Definition: pgoutput.c:149
TupleTableSlot * new_slot
Definition: pgoutput.c:150
AttrMap * attrmap
Definition: pgoutput.c:167

References Publication::alltables, Assert(), RelationSyncEntry::attrmap, bms_free(), CacheMemoryContext, RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecDropSingleTupleTableSlot(), RelationSyncEntry::exprstate, free_attrmap(), get_partition_ancestors(), get_rel_namespace(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), GetSchemaPublications(), GetTopMostAncestorInPublication(), HASH_ENTER, hash_search(), init_tuple_slot(), InvalidOid, lappend(), lfirst, list_free(), list_free_deep(), list_length(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextDelete(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, NIL, Publication::oid, RelationSyncEntry::old_slot, pgoutput_column_list_init(), pgoutput_row_filter_init(), RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationGetRelid, RelationSyncCache, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1924 of file pgoutput.c.

1925 {
1926  ListCell *lc;
1927 
1928  foreach(lc, entry->streamed_txns)
1929  {
1930  if (xid == (uint32) lfirst_int(lc))
1931  return true;
1932  }
1933 
1934  return false;
1935 }

References lfirst_int, and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  decoding_context)
static

Definition at line 1893 of file pgoutput.c.

1894 {
1895  HASHCTL ctl;
1896 
1897  if (RelationSyncCache != NULL)
1898  return;
1899 
1900  /* Make a new hash table for the cache */
1901  ctl.keysize = sizeof(Oid);
1902  ctl.entrysize = sizeof(RelationSyncEntry);
1903  ctl.hcxt = cachectx;
1904 
1905  RelationSyncCache = hash_create("logical replication output relation cache",
1906  128, &ctl,
1908 
1909  Assert(RelationSyncCache != NULL);
1910 
1914  (Datum) 0);
1917  (Datum) 0);
1918 }
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1561
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1519
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2319
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2268
uintptr_t Datum
Definition: postgres.h:411
Size keysize
Definition: hsearch.h:75
Size entrysize
Definition: hsearch.h:76
MemoryContext hcxt
Definition: hsearch.h:86
@ PUBLICATIONNAMESPACEMAP
Definition: syscache.h:82
@ PUBLICATIONRELMAP
Definition: syscache.h:85

References Assert(), CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, PUBLICATIONNAMESPACEMAP, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), rel_sync_cache_relation_cb(), and RelationSyncCache.

Referenced by pgoutput_startup().

◆ init_tuple_slot()

static void init_tuple_slot ( PGOutputData data,
Relation  relation,
RelationSyncEntry entry 
)
static

Definition at line 1079 of file pgoutput.c.

1081 {
1082  MemoryContext oldctx;
1083  TupleDesc oldtupdesc;
1084  TupleDesc newtupdesc;
1085 
1086  oldctx = MemoryContextSwitchTo(data->cachectx);
1087 
1088  /*
1089  * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1090  * live as long as the cache remains.
1091  */
1092  oldtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1093  newtupdesc = CreateTupleDescCopy(RelationGetDescr(relation));
1094 
1095  entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1096  entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1097 
1098  MemoryContextSwitchTo(oldctx);
1099 
1100  /*
1101  * Cache the map that will be used to convert the relation's tuples into
1102  * the ancestor's format, if needed.
1103  */
1104  if (entry->publish_as_relid != RelationGetRelid(relation))
1105  {
1106  Relation ancestor = RelationIdGetRelation(entry->publish_as_relid);
1107  TupleDesc indesc = RelationGetDescr(relation);
1108  TupleDesc outdesc = RelationGetDescr(ancestor);
1109 
1110  /* Map must live as long as the session does. */
1112 
1113  entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc);
1114 
1115  MemoryContextSwitchTo(oldctx);
1116  RelationClose(ancestor);
1117  }
1118 }
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc)
Definition: attmap.c:259
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:84
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1238
#define RelationGetDescr(relation)
Definition: rel.h:515
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2053
void RelationClose(Relation relation)
Definition: relcache.c:2159
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:111

References RelationSyncEntry::attrmap, build_attrmap_by_name_if_req(), CacheMemoryContext, CreateTupleDescCopy(), data, MakeSingleTupleTableSlot(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), and TTSOpsHeapTuple.

Referenced by get_rel_sync_entry().

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 1731 of file pgoutput.c.

1732 {
1733  List *result = NIL;
1734  ListCell *lc;
1735 
1736  foreach(lc, pubnames)
1737  {
1738  char *pubname = (char *) lfirst(lc);
1739  Publication *pub = GetPublicationByName(pubname, false);
1740 
1741  result = lappend(result, pub);
1742  }
1743 
1744  return result;
1745 }
Publication * GetPublicationByName(const char *pubname, bool missing_ok)

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 644 of file pgoutput.c.

647 {
648  bool schema_sent;
651 
652  /*
653  * Remember XID of the (sub)transaction for the change. We don't care if
654  * it's top-level transaction or not (we have already sent that XID in
655  * start of the current streaming block).
656  *
657  * If we're not in a streaming block, just use InvalidTransactionId and
658  * the write methods will not include it.
659  */
660  if (in_streaming)
661  xid = change->txn->xid;
662 
663  if (change->txn->toptxn)
664  topxid = change->txn->toptxn->xid;
665  else
666  topxid = xid;
667 
668  /*
669  * Do we need to send the schema? We do track streamed transactions
670  * separately, because those may be applied later (and the regular
671  * transactions won't see their effects until then) and in an order that
672  * we don't know at this point.
673  *
674  * XXX There is a scope of optimization here. Currently, we always send
675  * the schema first time in a streaming transaction but we can probably
676  * avoid that by checking 'relentry->schema_sent' flag. However, before
677  * doing that we need to study its impact on the case where we have a mix
678  * of streaming and non-streaming transactions.
679  */
680  if (in_streaming)
681  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
682  else
683  schema_sent = relentry->schema_sent;
684 
685  /* Nothing to do if we already sent the schema. */
686  if (schema_sent)
687  return;
688 
689  /*
690  * Send the schema. If the changes will be published using an ancestor's
691  * schema, not the relation's own, send that ancestor's schema before
692  * sending relation's own (XXX - maybe sending only the former suffices?).
693  */
694  if (relentry->publish_as_relid != RelationGetRelid(relation))
695  {
696  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
697 
698  send_relation_and_attrs(ancestor, xid, ctx, relentry->columns);
699  RelationClose(ancestor);
700  }
701 
702  send_relation_and_attrs(relation, xid, ctx, relentry->columns);
703 
704  if (in_streaming)
705  set_schema_sent_in_streamed_txn(relentry, topxid);
706  else
707  relentry->schema_sent = true;
708 }
uint32 TransactionId
Definition: c.h:587
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, Bitmapset *columns)
Definition: pgoutput.c:714
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1942
static bool in_streaming
Definition: pgoutput.c:83
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1924
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
struct ReorderBufferTXN * toptxn
TransactionId xid
#define InvalidTransactionId
Definition: transam.h:31

References RelationSyncEntry::columns, get_schema_sent_in_streamed_txn(), in_streaming, InvalidTransactionId, RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferTXN::toptxn, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 281 of file pgoutput.c.

282 {
283  ListCell *lc;
284  bool protocol_version_given = false;
285  bool publication_names_given = false;
286  bool binary_option_given = false;
287  bool messages_option_given = false;
288  bool streaming_given = false;
289  bool two_phase_option_given = false;
290 
291  data->binary = false;
292  data->streaming = false;
293  data->messages = false;
294  data->two_phase = false;
295 
296  foreach(lc, options)
297  {
298  DefElem *defel = (DefElem *) lfirst(lc);
299 
300  Assert(defel->arg == NULL || IsA(defel->arg, String));
301 
302  /* Check each param, whether or not we recognize it */
303  if (strcmp(defel->defname, "proto_version") == 0)
304  {
305  unsigned long parsed;
306  char *endptr;
307 
308  if (protocol_version_given)
309  ereport(ERROR,
310  (errcode(ERRCODE_SYNTAX_ERROR),
311  errmsg("conflicting or redundant options")));
312  protocol_version_given = true;
313 
314  errno = 0;
315  parsed = strtoul(strVal(defel->arg), &endptr, 10);
316  if (errno != 0 || *endptr != '\0')
317  ereport(ERROR,
318  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
319  errmsg("invalid proto_version")));
320 
321  if (parsed > PG_UINT32_MAX)
322  ereport(ERROR,
323  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
324  errmsg("proto_version \"%s\" out of range",
325  strVal(defel->arg))));
326 
327  data->protocol_version = (uint32) parsed;
328  }
329  else if (strcmp(defel->defname, "publication_names") == 0)
330  {
331  if (publication_names_given)
332  ereport(ERROR,
333  (errcode(ERRCODE_SYNTAX_ERROR),
334  errmsg("conflicting or redundant options")));
335  publication_names_given = true;
336 
337  if (!SplitIdentifierString(strVal(defel->arg), ',',
338  &data->publication_names))
339  ereport(ERROR,
340  (errcode(ERRCODE_INVALID_NAME),
341  errmsg("invalid publication_names syntax")));
342  }
343  else if (strcmp(defel->defname, "binary") == 0)
344  {
345  if (binary_option_given)
346  ereport(ERROR,
347  (errcode(ERRCODE_SYNTAX_ERROR),
348  errmsg("conflicting or redundant options")));
349  binary_option_given = true;
350 
351  data->binary = defGetBoolean(defel);
352  }
353  else if (strcmp(defel->defname, "messages") == 0)
354  {
355  if (messages_option_given)
356  ereport(ERROR,
357  (errcode(ERRCODE_SYNTAX_ERROR),
358  errmsg("conflicting or redundant options")));
359  messages_option_given = true;
360 
361  data->messages = defGetBoolean(defel);
362  }
363  else if (strcmp(defel->defname, "streaming") == 0)
364  {
365  if (streaming_given)
366  ereport(ERROR,
367  (errcode(ERRCODE_SYNTAX_ERROR),
368  errmsg("conflicting or redundant options")));
369  streaming_given = true;
370 
371  data->streaming = defGetBoolean(defel);
372  }
373  else if (strcmp(defel->defname, "two_phase") == 0)
374  {
375  if (two_phase_option_given)
376  ereport(ERROR,
377  (errcode(ERRCODE_SYNTAX_ERROR),
378  errmsg("conflicting or redundant options")));
379  two_phase_option_given = true;
380 
381  data->two_phase = defGetBoolean(defel);
382  }
383  else
384  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
385  }
386 }
#define PG_UINT32_MAX
Definition: c.h:525
bool defGetBoolean(DefElem *def)
Definition: define.c:108
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
#define IsA(nodeptr, _type_)
Definition: nodes.h:624
char * defname
Definition: parsenodes.h:765
Node * arg
Definition: parsenodes.h:766
Definition: value.h:58
#define strVal(v)
Definition: value.h:72
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3715

References DefElem::arg, Assert(), data, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, PG_UINT32_MAX, SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 581 of file pgoutput.c.

582 {
583  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
584 
585  OutputPluginPrepareWrite(ctx, !send_replication_origin);
587 
588  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
589  send_replication_origin);
590 
591  OutputPluginWrite(ctx, true);
592 }
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:662
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:649
#define InvalidRepOriginId
Definition: origin.h:33
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2345
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:127
StringInfo out
Definition: logical.h:71
RepOriginId origin_id
XLogRecPtr origin_lsn

References InvalidRepOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 513 of file pgoutput.c.

514 {
516  sizeof(PGOutputTxnData));
517 
518  txn->output_plugin_private = txndata;
519 }
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:906
MemoryContext context
Definition: logical.h:36
void * output_plugin_private

References LogicalDecodingContext::context, MemoryContextAllocZero(), and ReorderBufferTXN::output_plugin_private.

Referenced by _PG_output_plugin_init().

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 1352 of file pgoutput.c.

1354 {
1357  MemoryContext old;
1358  RelationSyncEntry *relentry;
1360  Relation ancestor = NULL;
1361  Relation targetrel = relation;
1363  TupleTableSlot *old_slot = NULL;
1364  TupleTableSlot *new_slot = NULL;
1365 
1366  update_replication_progress(ctx, false);
1367 
1368  if (!is_publishable_relation(relation))
1369  return;
1370 
1371  /*
1372  * Remember the xid for the change in streaming mode. We need to send xid
1373  * with each change in the streaming mode so that subscriber can make
1374  * their association and on aborts, it can discard the corresponding
1375  * changes.
1376  */
1377  if (in_streaming)
1378  xid = change->txn->xid;
1379 
1380  relentry = get_rel_sync_entry(data, relation);
1381 
1382  /* First check the table filter */
1383  switch (action)
1384  {
1386  if (!relentry->pubactions.pubinsert)
1387  return;
1388  break;
1390  if (!relentry->pubactions.pubupdate)
1391  return;
1392  break;
1394  if (!relentry->pubactions.pubdelete)
1395  return;
1396  break;
1397  default:
1398  Assert(false);
1399  }
1400 
1401  /* Avoid leaking memory by using and resetting our own context */
1402  old = MemoryContextSwitchTo(data->context);
1403 
1404  /* Send the data */
1405  switch (action)
1406  {
1408  new_slot = relentry->new_slot;
1409  ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1410  new_slot, false);
1411 
1412  /* Switch relation if publishing via root. */
1413  if (relentry->publish_as_relid != RelationGetRelid(relation))
1414  {
1415  Assert(relation->rd_rel->relispartition);
1416  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1417  targetrel = ancestor;
1418  /* Convert tuple if needed. */
1419  if (relentry->attrmap)
1420  {
1421  TupleDesc tupdesc = RelationGetDescr(targetrel);
1422 
1423  new_slot = execute_attr_map_slot(relentry->attrmap,
1424  new_slot,
1425  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1426  }
1427  }
1428 
1429  /* Check row filter */
1430  if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
1431  &action))
1432  break;
1433 
1434  /*
1435  * Send BEGIN if we haven't yet.
1436  *
1437  * We send the BEGIN message after ensuring that we will actually
1438  * send the change. This avoids sending a pair of BEGIN/COMMIT
1439  * messages for empty transactions.
1440  */
1441  if (txndata && !txndata->sent_begin_txn)
1442  pgoutput_send_begin(ctx, txn);
1443 
1444  /*
1445  * Schema should be sent using the original relation because it
1446  * also sends the ancestor's relation.
1447  */
1448  maybe_send_schema(ctx, change, relation, relentry);
1449 
1450  OutputPluginPrepareWrite(ctx, true);
1451  logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1452  data->binary, relentry->columns);
1453  OutputPluginWrite(ctx, true);
1454  break;
1456  if (change->data.tp.oldtuple)
1457  {
1458  old_slot = relentry->old_slot;
1459  ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1460  old_slot, false);
1461  }
1462 
1463  new_slot = relentry->new_slot;
1464  ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
1465  new_slot, false);
1466 
1467  /* Switch relation if publishing via root. */
1468  if (relentry->publish_as_relid != RelationGetRelid(relation))
1469  {
1470  Assert(relation->rd_rel->relispartition);
1471  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1472  targetrel = ancestor;
1473  /* Convert tuples if needed. */
1474  if (relentry->attrmap)
1475  {
1476  TupleDesc tupdesc = RelationGetDescr(targetrel);
1477 
1478  if (old_slot)
1479  old_slot = execute_attr_map_slot(relentry->attrmap,
1480  old_slot,
1481  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1482 
1483  new_slot = execute_attr_map_slot(relentry->attrmap,
1484  new_slot,
1485  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1486  }
1487  }
1488 
1489  /* Check row filter */
1490  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1491  relentry, &action))
1492  break;
1493 
1494  /* Send BEGIN if we haven't yet */
1495  if (txndata && !txndata->sent_begin_txn)
1496  pgoutput_send_begin(ctx, txn);
1497 
1498  maybe_send_schema(ctx, change, relation, relentry);
1499 
1500  OutputPluginPrepareWrite(ctx, true);
1501 
1502  /*
1503  * Updates could be transformed to inserts or deletes based on the
1504  * results of the row filter for old and new tuple.
1505  */
1506  switch (action)
1507  {
1509  logicalrep_write_insert(ctx->out, xid, targetrel,
1510  new_slot, data->binary,
1511  relentry->columns);
1512  break;
1514  logicalrep_write_update(ctx->out, xid, targetrel,
1515  old_slot, new_slot, data->binary,
1516  relentry->columns);
1517  break;
1519  logicalrep_write_delete(ctx->out, xid, targetrel,
1520  old_slot, data->binary);
1521  break;
1522  default:
1523  Assert(false);
1524  }
1525 
1526  OutputPluginWrite(ctx, true);
1527  break;
1529  if (change->data.tp.oldtuple)
1530  {
1531  old_slot = relentry->old_slot;
1532 
1533  ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
1534  old_slot, false);
1535 
1536  /* Switch relation if publishing via root. */
1537  if (relentry->publish_as_relid != RelationGetRelid(relation))
1538  {
1539  Assert(relation->rd_rel->relispartition);
1540  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1541  targetrel = ancestor;
1542  /* Convert tuple if needed. */
1543  if (relentry->attrmap)
1544  {
1545  TupleDesc tupdesc = RelationGetDescr(targetrel);
1546 
1547  old_slot = execute_attr_map_slot(relentry->attrmap,
1548  old_slot,
1549  MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
1550  }
1551  }
1552 
1553  /* Check row filter */
1554  if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
1555  relentry, &action))
1556  break;
1557 
1558  /* Send BEGIN if we haven't yet */
1559  if (txndata && !txndata->sent_begin_txn)
1560  pgoutput_send_begin(ctx, txn);
1561 
1562  maybe_send_schema(ctx, change, relation, relentry);
1563 
1564  OutputPluginPrepareWrite(ctx, true);
1565  logicalrep_write_delete(ctx->out, xid, targetrel,
1566  old_slot, data->binary);
1567  OutputPluginWrite(ctx, true);
1568  }
1569  else
1570  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1571  break;
1572  default:
1573  Assert(false);
1574  }
1575 
1576  if (RelationIsValid(ancestor))
1577  {
1578  RelationClose(ancestor);
1579  ancestor = NULL;
1580  }
1581 
1582  /* Cleanup */
1583  MemoryContextSwitchTo(old);
1584  MemoryContextReset(data->context);
1585 }
#define DEBUG1
Definition: elog.h:24
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1112
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:83
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1352
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
bool is_publishable_relation(Relation rel)
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:527
static void update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
Definition: pgoutput.c:2383
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:1963
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:644
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1171
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary)
Definition: proto.c:533
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:458
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns)
Definition: proto.c:414
#define RelationIsValid(relation)
Definition: rel.h:462
ReorderBufferChangeType
Definition: reorderbuffer.h:55
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:56
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:58
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:57
void * output_plugin_private
Definition: logical.h:76
ReorderBufferChangeType action
Definition: reorderbuffer.h:85
struct ReorderBufferChange::@105::@106 tp
union ReorderBufferChange::@105 data
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:177

References generate_unaccent_rules::action, ReorderBufferChange::action, Assert(), RelationSyncEntry::attrmap, RelationSyncEntry::columns, ReorderBufferChange::data, data, DEBUG1, elog, ExecStoreHeapTuple(), execute_attr_map_slot(), get_rel_sync_entry(), in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), MakeTupleTableSlot(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), RelationSyncEntry::new_slot, RelationSyncEntry::old_slot, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_row_filter(), pgoutput_send_begin(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationData::rd_rel, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, TTSOpsVirtual, ReorderBufferChange::txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_column_list_init()

static void pgoutput_column_list_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 978 of file pgoutput.c.

980 {
981  ListCell *lc;
982 
983  /*
984  * Find if there are any column lists for this relation. If there are,
985  * build a bitmap merging all the column lists.
986  *
987  * All the given publication-table mappings must be checked.
988  *
989  * Multiple publications might have multiple column lists for this
990  * relation.
991  *
992  * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
993  * list" so it takes precedence.
994  */
995  foreach(lc, publications)
996  {
997  Publication *pub = lfirst(lc);
998  HeapTuple cftuple = NULL;
999  Datum cfdatum = 0;
1000 
1001  /*
1002  * Assume there's no column list. Only if we find pg_publication_rel
1003  * entry with a column list we'll switch it to false.
1004  */
1005  bool pub_no_list = true;
1006 
1007  /*
1008  * If the publication is FOR ALL TABLES then it is treated the same as
1009  * if there are no column lists (even if other publications have a
1010  * list).
1011  */
1012  if (!pub->alltables)
1013  {
1014  /*
1015  * Check for the presence of a column list in this publication.
1016  *
1017  * Note: If we find no pg_publication_rel row, it's a publication
1018  * defined for a whole schema, so it can't have a column list,
1019  * just like a FOR ALL TABLES publication.
1020  */
1023  ObjectIdGetDatum(pub->oid));
1024 
1025  if (HeapTupleIsValid(cftuple))
1026  {
1027  /*
1028  * Lookup the column list attribute.
1029  *
1030  * Note: We update the pub_no_list value directly, because if
1031  * the value is NULL, we have no list (and vice versa).
1032  */
1033  cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
1034  Anum_pg_publication_rel_prattrs,
1035  &pub_no_list);
1036 
1037  /*
1038  * Build the column list bitmap in the per-entry context.
1039  *
1040  * We need to merge column lists from all publications, so we
1041  * update the same bitmapset. If the column list is null, we
1042  * interpret it as replicating all columns.
1043  */
1044  if (!pub_no_list) /* when not null */
1045  {
1047 
1048  entry->columns = pub_collist_to_bitmapset(entry->columns,
1049  cfdatum,
1050  entry->entry_cxt);
1051  }
1052  }
1053  }
1054 
1055  /*
1056  * Found a publication with no column list, so we're done. But first
1057  * discard column list we might have from preceding publications.
1058  */
1059  if (pub_no_list)
1060  {
1061  if (cftuple)
1062  ReleaseSysCache(cftuple);
1063 
1064  bms_free(entry->columns);
1065  entry->columns = NULL;
1066 
1067  break;
1068  }
1069 
1070  ReleaseSysCache(cftuple);
1071  } /* loop all subscribed publications */
1072 }
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:807
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1221
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1434
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1184

References Publication::alltables, bms_free(), RelationSyncEntry::columns, data, RelationSyncEntry::entry_cxt, HeapTupleIsValid, lfirst, ObjectIdGetDatum, Publication::oid, pgoutput_ensure_entry_cxt(), pub_collist_to_bitmapset(), PUBLICATIONRELMAP, RelationSyncEntry::publish_as_relid, ReleaseSysCache(), SearchSysCache2(), and SysCacheGetAttr().

Referenced by get_rel_sync_entry().

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 612 of file pgoutput.c.

614 {
615  update_replication_progress(ctx, false);
616 
617  OutputPluginPrepareWrite(ctx, true);
618  logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
619  OutputPluginWrite(ctx, true);
620 }
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:248

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 549 of file pgoutput.c.

551 {
553  bool sent_begin_txn;
554 
555  Assert(txndata);
556 
557  /*
558  * We don't need to send the commit message unless some relevant change
559  * from this transaction has been sent to the downstream.
560  */
561  sent_begin_txn = txndata->sent_begin_txn;
562  update_replication_progress(ctx, !sent_begin_txn);
563  pfree(txndata);
564  txn->output_plugin_private = NULL;
565 
566  if (!sent_begin_txn)
567  {
568  elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
569  return;
570  }
571 
572  OutputPluginPrepareWrite(ctx, true);
573  logicalrep_write_commit(ctx->out, txn, commit_lsn);
574  OutputPluginWrite(ctx, true);
575 }
void pfree(void *pointer)
Definition: mcxt.c:1175
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:89
bool sent_begin_txn
Definition: pgoutput.c:209

References Assert(), DEBUG1, elog, logicalrep_write_commit(), LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pfree(), PGOutputTxnData::sent_begin_txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_ensure_entry_cxt()

static void pgoutput_ensure_entry_cxt ( PGOutputData data,
RelationSyncEntry entry 
)
static

Definition at line 807 of file pgoutput.c.

808 {
809  Relation relation;
810 
811  /* The context may already exist, in which case bail out. */
812  if (entry->entry_cxt)
813  return;
814 
815  relation = RelationIdGetRelation(entry->publish_as_relid);
816 
817  entry->entry_cxt = AllocSetContextCreate(data->cachectx,
818  "entry private context",
820 
822  RelationGetRelationName(relation));
823 }
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:207
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:98
#define RelationGetRelationName(relation)
Definition: rel.h:523

References ALLOCSET_SMALL_SIZES, AllocSetContextCreate, data, RelationSyncEntry::entry_cxt, MemoryContextCopyAndSetIdentifier, RelationSyncEntry::publish_as_relid, RelationGetRelationName, and RelationIdGetRelation().

Referenced by pgoutput_column_list_init(), and pgoutput_row_filter_init().

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 1658 of file pgoutput.c.

1661 {
1664 
1665  update_replication_progress(ctx, false);
1666 
1667  if (!data->messages)
1668  return;
1669 
1670  /*
1671  * Remember the xid for the message in streaming mode. See
1672  * pgoutput_change.
1673  */
1674  if (in_streaming)
1675  xid = txn->xid;
1676 
1677  /*
1678  * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1679  */
1680  if (transactional)
1681  {
1683 
1684  /* Send BEGIN if we haven't yet */
1685  if (txndata && !txndata->sent_begin_txn)
1686  pgoutput_send_begin(ctx, txn);
1687  }
1688 
1689  OutputPluginPrepareWrite(ctx, true);
1691  xid,
1692  message_lsn,
1693  transactional,
1694  prefix,
1695  sz,
1696  message);
1697  OutputPluginWrite(ctx, true);
1698 }
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:642

References data, if(), in_streaming, InvalidTransactionId, logicalrep_write_message(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), pgoutput_send_begin(), PGOutputTxnData::sent_begin_txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 1704 of file pgoutput.c.

1706 {
1707  return false;
1708 }

Referenced by _PG_output_plugin_init().

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 598 of file pgoutput.c.

600 {
601  update_replication_progress(ctx, false);
602 
603  OutputPluginPrepareWrite(ctx, true);
604  logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
605  OutputPluginWrite(ctx, true);
606 }
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:198

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 626 of file pgoutput.c.

630 {
631  update_replication_progress(ctx, false);
632 
633  OutputPluginPrepareWrite(ctx, true);
634  logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
635  prepare_time);
636  OutputPluginWrite(ctx, true);
637 }
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:304

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_row_filter()

static bool pgoutput_row_filter ( Relation  relation,
TupleTableSlot old_slot,
TupleTableSlot **  new_slot_ptr,
RelationSyncEntry entry,
ReorderBufferChangeType action 
)
static

Definition at line 1171 of file pgoutput.c.

1174 {
1175  TupleDesc desc;
1176  int i;
1177  bool old_matched,
1178  new_matched,
1179  result;
1180  TupleTableSlot *tmp_new_slot;
1181  TupleTableSlot *new_slot = *new_slot_ptr;
1182  ExprContext *ecxt;
1183  ExprState *filter_exprstate;
1184 
1185  /*
1186  * We need this map to avoid relying on ReorderBufferChangeType enums
1187  * having specific values.
1188  */
1189  static const int map_changetype_pubaction[] = {
1193  };
1194 
1198 
1199  Assert(new_slot || old_slot);
1200 
1201  /* Get the corresponding row filter */
1202  filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1203 
1204  /* Bail out if there is no row filter */
1205  if (!filter_exprstate)
1206  return true;
1207 
1208  elog(DEBUG3, "table \"%s.%s\" has row filter",
1210  RelationGetRelationName(relation));
1211 
1213 
1214  ecxt = GetPerTupleExprContext(entry->estate);
1215 
1216  /*
1217  * For the following occasions where there is only one tuple, we can
1218  * evaluate the row filter for that tuple and return.
1219  *
1220  * For inserts, we only have the new tuple.
1221  *
1222  * For updates, we can have only a new tuple when none of the replica
1223  * identity columns changed and none of those columns have external data
1224  * but we still need to evaluate the row filter for the new tuple as the
1225  * existing values of those columns might not match the filter. Also,
1226  * users can use constant expressions in the row filter, so we anyway need
1227  * to evaluate it for the new tuple.
1228  *
1229  * For deletes, we only have the old tuple.
1230  */
1231  if (!new_slot || !old_slot)
1232  {
1233  ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1234  result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1235 
1236  return result;
1237  }
1238 
1239  /*
1240  * Both the old and new tuples must be valid only for updates and need to
1241  * be checked against the row filter.
1242  */
1243  Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1244 
1245  slot_getallattrs(new_slot);
1246  slot_getallattrs(old_slot);
1247 
1248  tmp_new_slot = NULL;
1249  desc = RelationGetDescr(relation);
1250 
1251  /*
1252  * The new tuple might not have all the replica identity columns, in which
1253  * case it needs to be copied over from the old tuple.
1254  */
1255  for (i = 0; i < desc->natts; i++)
1256  {
1257  Form_pg_attribute att = TupleDescAttr(desc, i);
1258 
1259  /*
1260  * if the column in the new tuple or old tuple is null, nothing to do
1261  */
1262  if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1263  continue;
1264 
1265  /*
1266  * Unchanged toasted replica identity columns are only logged in the
1267  * old tuple. Copy this over to the new tuple. The changed (or WAL
1268  * Logged) toast values are always assembled in memory and set as
1269  * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1270  */
1271  if (att->attlen == -1 &&
1272  VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
1273  !VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
1274  {
1275  if (!tmp_new_slot)
1276  {
1277  tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1278  ExecClearTuple(tmp_new_slot);
1279 
1280  memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1281  desc->natts * sizeof(Datum));
1282  memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1283  desc->natts * sizeof(bool));
1284  }
1285 
1286  tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1287  tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1288  }
1289  }
1290 
1291  ecxt->ecxt_scantuple = old_slot;
1292  old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1293 
1294  if (tmp_new_slot)
1295  {
1296  ExecStoreVirtualTuple(tmp_new_slot);
1297  ecxt->ecxt_scantuple = tmp_new_slot;
1298  }
1299  else
1300  ecxt->ecxt_scantuple = new_slot;
1301 
1302  new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1303 
1304  /*
1305  * Case 1: if both tuples don't match the row filter, bailout. Send
1306  * nothing.
1307  */
1308  if (!old_matched && !new_matched)
1309  return false;
1310 
1311  /*
1312  * Case 2: if the old tuple doesn't satisfy the row filter but the new
1313  * tuple does, transform the UPDATE into INSERT.
1314  *
1315  * Use the newly transformed tuple that must contain the column values for
1316  * all the replica identity columns. This is required to ensure that the
1317  * while inserting the tuple in the downstream node, we have all the
1318  * required column values.
1319  */
1320  if (!old_matched && new_matched)
1321  {
1323 
1324  if (tmp_new_slot)
1325  *new_slot_ptr = tmp_new_slot;
1326  }
1327 
1328  /*
1329  * Case 3: if the old tuple satisfies the row filter but the new tuple
1330  * doesn't, transform the UPDATE into DELETE.
1331  *
1332  * This transformation does not require another tuple. The Old tuple will
1333  * be used for DELETE.
1334  */
1335  else if (old_matched && !new_matched)
1337 
1338  /*
1339  * Case 4: if both tuples match the row filter, transformation isn't
1340  * required. (*action is default UPDATE).
1341  */
1342 
1343  return true;
1344 }
#define DEBUG3
Definition: elog.h:22
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1552
#define ResetPerTupleExprContext(estate)
Definition: executor.h:546
#define GetPerTupleExprContext(estate)
Definition: executor.h:537
int i
Definition: isn.c:73
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3326
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:784
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: postgres.h:327
#define RelationGetNamespace(relation)
Definition: rel.h:530
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:232
bool * tts_isnull
Definition: tuptable.h:128
Datum * tts_values
Definition: tuptable.h:126
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:354

References generate_unaccent_rules::action, Assert(), DEBUG3, ExprContext::ecxt_scantuple, elog, RelationSyncEntry::estate, ExecClearTuple(), ExecStoreVirtualTuple(), RelationSyncEntry::exprstate, get_namespace_name(), GetPerTupleExprContext, i, MakeSingleTupleTableSlot(), TupleDescData::natts, pgoutput_row_filter_exec_expr(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, RelationGetDescr, RelationGetNamespace, RelationGetRelationName, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ResetPerTupleExprContext, slot_getallattrs(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_values, TTSOpsVirtual, TupleDescAttr, and VARATT_IS_EXTERNAL_ONDISK.

Referenced by pgoutput_change().

◆ pgoutput_row_filter_exec_expr()

static bool pgoutput_row_filter_exec_expr ( ExprState state,
ExprContext econtext 
)
static

Definition at line 784 of file pgoutput.c.

785 {
786  Datum ret;
787  bool isnull;
788 
789  Assert(state != NULL);
790 
791  ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
792 
793  elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
794  isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
795  isnull ? "true" : "false");
796 
797  if (isnull)
798  return false;
799 
800  return DatumGetBool(ret);
801 }
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:335
#define DatumGetBool(X)
Definition: postgres.h:437
Definition: regguts.h:318

References Assert(), DatumGetBool, DEBUG3, elog, and ExecEvalExprSwitchContext().

Referenced by pgoutput_row_filter().

◆ pgoutput_row_filter_init()

static void pgoutput_row_filter_init ( PGOutputData data,
List publications,
RelationSyncEntry entry 
)
static

Definition at line 829 of file pgoutput.c.

831 {
832  ListCell *lc;
833  List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
834  bool no_filter[] = {false, false, false}; /* One per pubaction */
835  MemoryContext oldctx;
836  int idx;
837  bool has_filter = true;
838 
839  /*
840  * Find if there are any row filters for this relation. If there are, then
841  * prepare the necessary ExprState and cache it in entry->exprstate. To
842  * build an expression state, we need to ensure the following:
843  *
844  * All the given publication-table mappings must be checked.
845  *
846  * Multiple publications might have multiple row filters for this
847  * relation. Since row filter usage depends on the DML operation, there
848  * are multiple lists (one for each operation) to which row filters will
849  * be appended.
850  *
851  * FOR ALL TABLES implies "don't use row filter expression" so it takes
852  * precedence.
853  */
854  foreach(lc, publications)
855  {
856  Publication *pub = lfirst(lc);
857  HeapTuple rftuple = NULL;
858  Datum rfdatum = 0;
859  bool pub_no_filter = false;
860 
861  if (pub->alltables)
862  {
863  /*
864  * If the publication is FOR ALL TABLES then it is treated the
865  * same as if this table has no row filters (even if for other
866  * publications it does).
867  */
868  pub_no_filter = true;
869  }
870  else
871  {
872  /*
873  * Check for the presence of a row filter in this publication.
874  */
877  ObjectIdGetDatum(pub->oid));
878 
879  if (HeapTupleIsValid(rftuple))
880  {
881  /* Null indicates no filter. */
882  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
883  Anum_pg_publication_rel_prqual,
884  &pub_no_filter);
885  }
886  else
887  {
888  pub_no_filter = true;
889  }
890  }
891 
892  if (pub_no_filter)
893  {
894  if (rftuple)
895  ReleaseSysCache(rftuple);
896 
897  no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
898  no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
899  no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
900 
901  /*
902  * Quick exit if all the DML actions are publicized via this
903  * publication.
904  */
905  if (no_filter[PUBACTION_INSERT] &&
906  no_filter[PUBACTION_UPDATE] &&
907  no_filter[PUBACTION_DELETE])
908  {
909  has_filter = false;
910  break;
911  }
912 
913  /* No additional work for this publication. Next one. */
914  continue;
915  }
916 
917  /* Form the per pubaction row filter lists. */
918  if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
919  rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
920  TextDatumGetCString(rfdatum));
921  if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
922  rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
923  TextDatumGetCString(rfdatum));
924  if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
925  rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
926  TextDatumGetCString(rfdatum));
927 
928  ReleaseSysCache(rftuple);
929  } /* loop all subscribed publications */
930 
931  /* Clean the row filter */
932  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
933  {
934  if (no_filter[idx])
935  {
936  list_free_deep(rfnodes[idx]);
937  rfnodes[idx] = NIL;
938  }
939  }
940 
941  if (has_filter)
942  {
944 
946 
947  /*
948  * Now all the filters for all pubactions are known. Combine them when
949  * their pubactions are the same.
950  */
951  oldctx = MemoryContextSwitchTo(entry->entry_cxt);
952  entry->estate = create_estate_for_relation(relation);
953  for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
954  {
955  List *filters = NIL;
956  Expr *rfnode;
957 
958  if (rfnodes[idx] == NIL)
959  continue;
960 
961  foreach(lc, rfnodes[idx])
962  filters = lappend(filters, stringToNode((char *) lfirst(lc)));
963 
964  /* combine the row filter and cache the ExprState */
965  rfnode = make_orclause(filters);
966  entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
967  } /* for each pubaction */
968  MemoryContextSwitchTo(oldctx);
969 
970  RelationClose(relation);
971  }
972 }
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
#define TextDatumGetCString(d)
Definition: builtins.h:86
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:746
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:653
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:108
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:758
void * stringToNode(const char *str)
Definition: read.c:89

References Publication::alltables, create_estate_for_relation(), data, RelationSyncEntry::entry_cxt, RelationSyncEntry::estate, ExecPrepareExpr(), RelationSyncEntry::exprstate, HeapTupleIsValid, idx(), lappend(), lfirst, list_free_deep(), make_orclause(), MemoryContextSwitchTo(), NIL, NUM_ROWFILTER_PUBACTIONS, ObjectIdGetDatum, Publication::oid, pgoutput_ensure_entry_cxt(), PUBACTION_DELETE, PUBACTION_INSERT, PUBACTION_UPDATE, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PUBLICATIONRELMAP, RelationSyncEntry::publish_as_relid, PublicationActions::pubupdate, RelationClose(), RelationIdGetRelation(), ReleaseSysCache(), SearchSysCache2(), stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by get_rel_sync_entry().

◆ pgoutput_send_begin()

static void pgoutput_send_begin ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 527 of file pgoutput.c.

528 {
529  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
531 
532  Assert(txndata);
533  Assert(!txndata->sent_begin_txn);
534 
535  OutputPluginPrepareWrite(ctx, !send_replication_origin);
536  logicalrep_write_begin(ctx->out, txn);
537  txndata->sent_begin_txn = true;
538 
539  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
540  send_replication_origin);
541 
542  OutputPluginWrite(ctx, true);
543 }
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:60

References Assert(), InvalidRepOriginId, logicalrep_write_begin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), send_repl_origin(), and PGOutputTxnData::sent_begin_txn.

Referenced by pgoutput_change(), pgoutput_message(), and pgoutput_truncate().

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 1718 of file pgoutput.c.

1719 {
1720  if (RelationSyncCache)
1721  {
1723  RelationSyncCache = NULL;
1724  }
1725 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862

References hash_destroy(), and RelationSyncCache.

Referenced by _PG_output_plugin_init().

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 392 of file pgoutput.c.

394 {
396 
397  /* Create our memory context for private allocations. */
398  data->context = AllocSetContextCreate(ctx->context,
399  "logical replication output context",
401 
402  data->cachectx = AllocSetContextCreate(ctx->context,
403  "logical replication cache context",
405 
407 
408  /* This plugin uses binary protocol. */
410 
411  /*
412  * This is replication start and not slot initialization.
413  *
414  * Parse and validate options passed by the client.
415  */
416  if (!is_init)
417  {
418  /* Parse the params and ERROR if we see any we don't recognize */
420 
421  /* Check if we support requested protocol */
422  if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
423  ereport(ERROR,
424  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
426  data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
427 
428  if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
429  ereport(ERROR,
430  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
431  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
432  data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
433 
434  if (list_length(data->publication_names) < 1)
435  ereport(ERROR,
436  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
437  errmsg("publication_names parameter missing")));
438 
439  /*
440  * Decide whether to enable streaming. It is disabled by default, in
441  * which case we just update the flag in decoding context. Otherwise
442  * we only allow it with sufficient version of the protocol, and when
443  * the output plugin supports it.
444  */
445  if (!data->streaming)
446  ctx->streaming = false;
447  else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
448  ereport(ERROR,
449  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
450  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
451  data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
452  else if (!ctx->streaming)
453  ereport(ERROR,
454  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455  errmsg("streaming requested, but not supported by output plugin")));
456 
457  /* Also remember we're currently not streaming any transaction. */
458  in_streaming = false;
459 
460  /*
461  * Here, we just check whether the two-phase option is passed by
462  * plugin and decide whether to enable it at later point of time. It
463  * remains enabled if the previous start-up has done so. But we only
464  * allow the option to be passed in with sufficient version of the
465  * protocol, and when the output plugin supports it.
466  */
467  if (!data->two_phase)
468  ctx->twophase_opt_given = false;
469  else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
470  ereport(ERROR,
471  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
472  errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
473  data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
474  else if (!ctx->twophase)
475  ereport(ERROR,
476  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
477  errmsg("two-phase commit requested, but not supported by output plugin")));
478  else
479  ctx->twophase_opt_given = true;
480 
481  /* Init publication state. */
482  data->publications = NIL;
483  publications_valid = false;
486  (Datum) 0);
487 
488  /* Initialize relation schema cache. */
490  }
491  else
492  {
493  /*
494  * Disable the streaming and prepared transactions during the slot
495  * initialization mode.
496  */
497  ctx->streaming = false;
498  ctx->twophase = false;
499  }
500 }
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:36
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:38
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:39
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:40
void * palloc0(Size size)
Definition: mcxt.c:1099
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:281
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:1893
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1753
List * output_plugin_options
Definition: logical.h:59
OutputPluginOutputType output_type
Definition: output_plugin.h:28
@ PUBLICATIONOID
Definition: syscache.h:83

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), LogicalDecodingContext::context, data, ereport, errcode(), errmsg(), ERROR, in_streaming, init_rel_sync_cache(), list_length(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), publication_invalidation_cb(), PUBLICATIONOID, publications_valid, LogicalDecodingContext::streaming, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 1818 of file pgoutput.c.

1821 {
1822  ReorderBufferTXN *toptxn;
1823 
1824  /*
1825  * The abort should happen outside streaming block, even for streamed
1826  * transactions. The transaction has to be marked as streamed, though.
1827  */
1828  Assert(!in_streaming);
1829 
1830  /* determine the toplevel transaction */
1831  toptxn = (txn->toptxn) ? txn->toptxn : txn;
1832 
1833  Assert(rbtxn_is_streamed(toptxn));
1834 
1835  OutputPluginPrepareWrite(ctx, true);
1836  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
1837  OutputPluginWrite(ctx, true);
1838 
1839  cleanup_rel_sync_cache(toptxn->xid, false);
1840 }
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2232
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:1168
#define rbtxn_is_streamed(txn)

References Assert(), cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_abort(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, ReorderBufferTXN::toptxn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1847 of file pgoutput.c.

1850 {
1851  /*
1852  * The commit should happen outside streaming block, even for streamed
1853  * transactions. The transaction has to be marked as streamed, though.
1854  */
1855  Assert(!in_streaming);
1856  Assert(rbtxn_is_streamed(txn));
1857 
1858  update_replication_progress(ctx, false);
1859 
1860  OutputPluginPrepareWrite(ctx, true);
1861  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1862  OutputPluginWrite(ctx, true);
1863 
1864  cleanup_rel_sync_cache(txn->xid, true);
1865 }
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1117

References Assert(), cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1873 of file pgoutput.c.

1876 {
1877  Assert(rbtxn_is_streamed(txn));
1878 
1879  update_replication_progress(ctx, false);
1880  OutputPluginPrepareWrite(ctx, true);
1881  logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1882  OutputPluginWrite(ctx, true);
1883 }
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:364

References Assert(), logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, and update_replication_progress().

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1768 of file pgoutput.c.

1770 {
1771  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1772 
1773  /* we can't nest streaming of transactions */
1774  Assert(!in_streaming);
1775 
1776  /*
1777  * If we already sent the first stream for this transaction then don't
1778  * send the origin id in the subsequent streams.
1779  */
1780  if (rbtxn_is_streamed(txn))
1781  send_replication_origin = false;
1782 
1783  OutputPluginPrepareWrite(ctx, !send_replication_origin);
1785 
1787  send_replication_origin);
1788 
1789  OutputPluginWrite(ctx, true);
1790 
1791  /* we're streaming a chunk of transaction now */
1792  in_streaming = true;
1793 }
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1074
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References Assert(), in_streaming, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 1799 of file pgoutput.c.

1801 {
1802  /* we should be streaming a trasanction */
1804 
1805  OutputPluginPrepareWrite(ctx, true);
1807  OutputPluginWrite(ctx, true);
1808 
1809  /* we've stopped streaming a transaction */
1810  in_streaming = false;
1811 }
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1108

References Assert(), in_streaming, logicalrep_write_stream_stop(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 1588 of file pgoutput.c.

1590 {
1593  MemoryContext old;
1594  RelationSyncEntry *relentry;
1595  int i;
1596  int nrelids;
1597  Oid *relids;
1599 
1600  update_replication_progress(ctx, false);
1601 
1602  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1603  if (in_streaming)
1604  xid = change->txn->xid;
1605 
1606  old = MemoryContextSwitchTo(data->context);
1607 
1608  relids = palloc0(nrelations * sizeof(Oid));
1609  nrelids = 0;
1610 
1611  for (i = 0; i < nrelations; i++)
1612  {
1613  Relation relation = relations[i];
1614  Oid relid = RelationGetRelid(relation);
1615 
1616  if (!is_publishable_relation(relation))
1617  continue;
1618 
1619  relentry = get_rel_sync_entry(data, relation);
1620 
1621  if (!relentry->pubactions.pubtruncate)
1622  continue;
1623 
1624  /*
1625  * Don't send partitions if the publication wants to send only the
1626  * root tables through it.
1627  */
1628  if (relation->rd_rel->relispartition &&
1629  relentry->publish_as_relid != relid)
1630  continue;
1631 
1632  relids[nrelids++] = relid;
1633 
1634  /* Send BEGIN if we haven't yet */
1635  if (txndata && !txndata->sent_begin_txn)
1636  pgoutput_send_begin(ctx, txn);
1637 
1638  maybe_send_schema(ctx, change, relation, relentry);
1639  }
1640 
1641  if (nrelids > 0)
1642  {
1643  OutputPluginPrepareWrite(ctx, true);
1645  xid,
1646  nrelids,
1647  relids,
1648  change->data.truncate.cascade,
1649  change->data.truncate.restart_seqs);
1650  OutputPluginWrite(ctx, true);
1651  }
1652 
1653  MemoryContextSwitchTo(old);
1654  MemoryContextReset(data->context);
1655 }
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:585
struct ReorderBufferChange::@105::@107 truncate

References ReorderBufferChange::data, data, get_rel_sync_entry(), i, in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, ReorderBufferTXN::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), pgoutput_send_begin(), RelationSyncEntry::pubactions, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, RelationData::rd_rel, RelationGetRelid, ReorderBufferChange::truncate, ReorderBufferChange::txn, update_replication_progress(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1753 of file pgoutput.c.

1754 {
1755  publications_valid = false;
1756 
1757  /*
1758  * Also invalidate per-relation cache so that next time the filtering info
1759  * is checked it will be updated with the new publication settings.
1760  */
1761  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1762 }
void * arg

References arg, publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 2319 of file pgoutput.c.

2320 {
2322  RelationSyncEntry *entry;
2323 
2324  /*
2325  * We can get here if the plugin was used in SQL interface as the
2326  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2327  * is no way to unregister the relcache invalidation callback.
2328  */
2329  if (RelationSyncCache == NULL)
2330  return;
2331 
2332  /*
2333  * There is no way to find which entry in our cache the hash belongs to so
2334  * mark the whole cache as invalid.
2335  */
2337  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2338  {
2339  entry->replicate_valid = false;
2340  }
2341 }
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229

References hash_seq_init(), hash_seq_search(), RelationSyncCache, RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 2268 of file pgoutput.c.

2269 {
2270  RelationSyncEntry *entry;
2271 
2272  /*
2273  * We can get here if the plugin was used in SQL interface as the
2274  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
2275  * is no way to unregister the relcache invalidation callback.
2276  */
2277  if (RelationSyncCache == NULL)
2278  return;
2279 
2280  /*
2281  * Nobody keeps pointers to entries in this hash table around outside
2282  * logical decoding callback calls - but invalidation events can come in
2283  * *during* a callback if we do any syscache access in the callback.
2284  * Because of that we must mark the cache entry as invalid but not damage
2285  * any of its substructure here. The next get_rel_sync_entry() call will
2286  * rebuild it all.
2287  */
2288  if (OidIsValid(relid))
2289  {
2290  /*
2291  * Getting invalidations for relations that aren't in the table is
2292  * entirely normal. So we don't care if it's found or not.
2293  */
2294  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
2295  HASH_FIND, NULL);
2296  if (entry != NULL)
2297  entry->replicate_valid = false;
2298  }
2299  else
2300  {
2301  /* Whole cache must be flushed. */
2303 
2305  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2306  {
2307  entry->replicate_valid = false;
2308  }
2309  }
2310 }
#define OidIsValid(objectId)
Definition: c.h:710
@ HASH_FIND
Definition: hsearch.h:113

References HASH_FIND, hash_search(), hash_seq_init(), hash_seq_search(), OidIsValid, RelationSyncCache, RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache().

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx,
Bitmapset columns 
)
static

Definition at line 714 of file pgoutput.c.

717 {
718  TupleDesc desc = RelationGetDescr(relation);
719  int i;
720 
721  /*
722  * Write out type info if needed. We do that only for user-created types.
723  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
724  * objects with hand-assigned OIDs to be "built in", not for instance any
725  * function or type defined in the information_schema. This is important
726  * because only hand-assigned OIDs can be expected to remain stable across
727  * major versions.
728  */
729  for (i = 0; i < desc->natts; i++)
730  {
731  Form_pg_attribute att = TupleDescAttr(desc, i);
732 
733  if (att->attisdropped || att->attgenerated)
734  continue;
735 
736  if (att->atttypid < FirstGenbkiObjectId)
737  continue;
738 
739  /* Skip this attribute if it's not present in the column list */
740  if (columns != NULL && !bms_is_member(att->attnum, columns))
741  continue;
742 
743  OutputPluginPrepareWrite(ctx, false);
744  logicalrep_write_typ(ctx->out, xid, att->atttypid);
745  OutputPluginWrite(ctx, false);
746  }
747 
748  OutputPluginPrepareWrite(ctx, false);
749  logicalrep_write_rel(ctx->out, xid, relation, columns);
750  OutputPluginWrite(ctx, false);
751 }
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns)
Definition: proto.c:669
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:724
#define FirstGenbkiObjectId
Definition: transam.h:195

References bms_is_member(), FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 2345 of file pgoutput.c.

2347 {
2348  if (send_origin)
2349  {
2350  char *origin;
2351 
2352  /*----------
2353  * XXX: which behaviour do we want here?
2354  *
2355  * Alternatives:
2356  * - don't send origin message if origin name not found
2357  * (that's what we do now)
2358  * - throw error - that will break replication, not good
2359  * - send some special "unknown" origin
2360  *----------
2361  */
2362  if (replorigin_by_oid(origin_id, true, &origin))
2363  {
2364  /* Message boundary */
2365  OutputPluginWrite(ctx, false);
2366  OutputPluginPrepareWrite(ctx, true);
2367 
2368  logicalrep_write_origin(ctx->out, origin, origin_lsn);
2369  }
2370  }
2371 }
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:449
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:385

References logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_send_begin(), and pgoutput_stream_start().

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1942 of file pgoutput.c.

1943 {
1944  MemoryContext oldctx;
1945 
1947 
1948  entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
1949 
1950  MemoryContextSwitchTo(oldctx);
1951 }
List * lappend_int(List *list, int datum)
Definition: list.c:354

References CacheMemoryContext, lappend_int(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

◆ update_replication_progress()

static void update_replication_progress ( LogicalDecodingContext ctx,
bool  skipped_xact 
)
static

Definition at line 2383 of file pgoutput.c.

2384 {
2385  static int changes_count = 0;
2386 
2387  /*
2388  * We don't want to try sending a keepalive message after processing each
2389  * change as that can have overhead. Tests revealed that there is no
2390  * noticeable overhead in doing it after continuously processing 100 or so
2391  * changes.
2392  */
2393 #define CHANGES_THRESHOLD 100
2394 
2395  /*
2396  * If we are at the end of transaction LSN, update progress tracking.
2397  * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
2398  * try to send a keepalive message if required.
2399  */
2400  if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
2401  {
2402  OutputPluginUpdateProgress(ctx, skipped_xact);
2403  changes_count = 0;
2404  }
2405 }
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:675
#define CHANGES_THRESHOLD

References CHANGES_THRESHOLD, LogicalDecodingContext::end_xact, and OutputPluginUpdateProgress().

Referenced by pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), and pgoutput_truncate().

Variable Documentation

◆ in_streaming

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 36 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 82 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache