PostgreSQL Source Code  git master
pgoutput.c File Reference
#include "postgres.h"
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "commands/defrem.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/int8.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 

Functions

void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx)
 
static void init_rel_sync_cache (MemoryContext decoding_context)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Oid relid)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void parse_output_parameters (List *options, uint32 *protocol_version, List **publication_names, bool *binary, bool *enable_streaming)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static bool in_streaming
 
static HTABRelationSyncCache = NULL
 

Typedef Documentation

◆ RelationSyncEntry

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 137 of file pgoutput.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::filter_by_origin_cb, pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_txn(), pgoutput_origin_filter(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

138 {
140 
148 
149  /* transaction streaming */
156 }
LogicalDecodeTruncateCB truncate_cb
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:498
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:137
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:751
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:703
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:370
LogicalDecodeStreamAbortCB stream_abort_cb
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:839
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:244
LogicalDecodeChangeCB change_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:690
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeStreamStartCB stream_start_cb
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:791
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamStopCB stream_stop_cb
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:625
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:334
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeStreamChangeCB stream_change_cb
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:810
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:904

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 1079 of file pgoutput.c.

References Assert, foreach_delete_current, hash_seq_init(), hash_seq_search(), lfirst_int, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

1080 {
1081  HASH_SEQ_STATUS hash_seq;
1082  RelationSyncEntry *entry;
1083  ListCell *lc;
1084 
1085  Assert(RelationSyncCache != NULL);
1086 
1087  hash_seq_init(&hash_seq, RelationSyncCache);
1088  while ((entry = hash_seq_search(&hash_seq)) != NULL)
1089  {
1090  /*
1091  * We can set the schema_sent flag for an entry that has committed xid
1092  * in the list as that ensures that the subscriber would have the
1093  * corresponding schema and we don't need to send it unless there is
1094  * any invalidation for that relation.
1095  */
1096  foreach(lc, entry->streamed_txns)
1097  {
1098  if (xid == (uint32) lfirst_int(lc))
1099  {
1100  if (is_commit)
1101  entry->schema_sent = true;
1102 
1103  entry->streamed_txns =
1105  break;
1106  }
1107  }
1108  }
1109 }
List * streamed_txns
Definition: pgoutput.c:96
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:377
#define lfirst_int(lc)
Definition: pg_list.h:190
unsigned int uint32
Definition: c.h:374
static HTAB * RelationSyncCache
Definition: pgoutput.c:120
#define Assert(condition)
Definition: c.h:745
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1401
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1391

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Oid  relid 
)
static

Definition at line 938 of file pgoutput.c.

References Publication::alltables, Assert, CacheMemoryContext, get_partition_ancestors(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), HASH_ENTER, hash_search(), InvalidOid, lfirst, lfirst_oid, list_free(), list_free_deep(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextSwitchTo(), NIL, Publication::oid, Publication::pubactions, RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PGOutputData::publication_names, PGOutputData::publications, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationSyncEntry::relid, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_change(), and pgoutput_truncate().

939 {
940  RelationSyncEntry *entry;
941  bool am_partition = get_rel_relispartition(relid);
942  char relkind = get_rel_relkind(relid);
943  bool found;
944  MemoryContext oldctx;
945 
946  Assert(RelationSyncCache != NULL);
947 
948  /* Find cached relation info, creating if not found */
950  (void *) &relid,
951  HASH_ENTER, &found);
952  Assert(entry != NULL);
953 
954  /* Not found means schema wasn't sent */
955  if (!found)
956  {
957  /* immediately make a new entry valid enough to satisfy callbacks */
958  entry->schema_sent = false;
959  entry->streamed_txns = NIL;
960  entry->replicate_valid = false;
961  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
962  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
963  entry->publish_as_relid = InvalidOid;
964  }
965 
966  /* Validate the entry */
967  if (!entry->replicate_valid)
968  {
969  List *pubids = GetRelationPublications(relid);
970  ListCell *lc;
971  Oid publish_as_relid = relid;
972 
973  /* Reload publications if needed before use. */
974  if (!publications_valid)
975  {
977  if (data->publications)
979 
981  MemoryContextSwitchTo(oldctx);
982  publications_valid = true;
983  }
984 
985  /*
986  * Build publication cache. We can't use one provided by relcache as
987  * relcache considers all publications given relation is in, but here
988  * we only need to consider ones that the subscriber requested.
989  */
990  foreach(lc, data->publications)
991  {
992  Publication *pub = lfirst(lc);
993  bool publish = false;
994 
995  if (pub->alltables)
996  {
997  publish = true;
998  if (pub->pubviaroot && am_partition)
999  publish_as_relid = llast_oid(get_partition_ancestors(relid));
1000  }
1001 
1002  if (!publish)
1003  {
1004  bool ancestor_published = false;
1005 
1006  /*
1007  * For a partition, check if any of the ancestors are
1008  * published. If so, note down the topmost ancestor that is
1009  * published via this publication, which will be used as the
1010  * relation via which to publish the partition's changes.
1011  */
1012  if (am_partition)
1013  {
1014  List *ancestors = get_partition_ancestors(relid);
1015  ListCell *lc2;
1016 
1017  /*
1018  * Find the "topmost" ancestor that is in this
1019  * publication.
1020  */
1021  foreach(lc2, ancestors)
1022  {
1023  Oid ancestor = lfirst_oid(lc2);
1024 
1026  pub->oid))
1027  {
1028  ancestor_published = true;
1029  if (pub->pubviaroot)
1030  publish_as_relid = ancestor;
1031  }
1032  }
1033  }
1034 
1035  if (list_member_oid(pubids, pub->oid) || ancestor_published)
1036  publish = true;
1037  }
1038 
1039  /*
1040  * Don't publish changes for partitioned tables, because
1041  * publishing those of its partitions suffices, unless partition
1042  * changes won't be published due to pubviaroot being set.
1043  */
1044  if (publish &&
1045  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
1046  {
1047  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
1048  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
1049  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
1051  }
1052 
1053  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
1054  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
1055  break;
1056  }
1057 
1058  list_free(pubids);
1059 
1060  entry->publish_as_relid = publish_as_relid;
1061  entry->replicate_valid = true;
1062  }
1063 
1064  return entry;
1065 }
List * streamed_txns
Definition: pgoutput.c:96
#define NIL
Definition: pg_list.h:65
PublicationActions pubactions
static bool publications_valid
Definition: pgoutput.c:61
bool replicate_valid
Definition: pgoutput.c:99
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1915
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define llast_oid(l)
Definition: pg_list.h:216
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
unsigned int Oid
Definition: postgres_ext.h:31
void list_free_deep(List *list)
Definition: list.c:1390
List * GetRelationPublications(Oid relid)
List * publication_names
Definition: pgoutput.h:25
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:1939
static HTAB * RelationSyncCache
Definition: pgoutput.c:120
#define InvalidOid
Definition: postgres_ext.h:36
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:189
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:716
List * publications
Definition: pgoutput.h:26
void list_free(List *list)
Definition: list.c:1376
Definition: pg_list.h:50
List * get_partition_ancestors(Oid relid)
Definition: partition.c:115
#define lfirst_oid(lc)
Definition: pg_list.h:191
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:100

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 899 of file pgoutput.c.

References lfirst_int, and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

900 {
901  ListCell *lc;
902 
903  foreach(lc, entry->streamed_txns)
904  {
905  if (xid == (uint32) lfirst_int(lc))
906  return true;
907  }
908 
909  return false;
910 }
List * streamed_txns
Definition: pgoutput.c:96
#define lfirst_int(lc)
Definition: pg_list.h:190
unsigned int uint32
Definition: c.h:374

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  decoding_context)
static

Definition at line 867 of file pgoutput.c.

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, MemoryContextSwitchTo(), MemSet, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), and rel_sync_cache_relation_cb().

Referenced by pgoutput_startup().

868 {
869  HASHCTL ctl;
870  MemoryContext old_ctxt;
871 
872  if (RelationSyncCache != NULL)
873  return;
874 
875  /* Make a new hash table for the cache */
876  MemSet(&ctl, 0, sizeof(ctl));
877  ctl.keysize = sizeof(Oid);
878  ctl.entrysize = sizeof(RelationSyncEntry);
879  ctl.hcxt = cachectx;
880 
881  old_ctxt = MemoryContextSwitchTo(cachectx);
882  RelationSyncCache = hash_create("logical replication output relation cache",
883  128, &ctl,
885  (void) MemoryContextSwitchTo(old_ctxt);
886 
887  Assert(RelationSyncCache != NULL);
888 
892  (Datum) 0);
893 }
#define HASH_CONTEXT
Definition: hsearch.h:91
#define HASH_ELEM
Definition: hsearch.h:85
MemoryContext hcxt
Definition: hsearch.h:77
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:1115
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:72
#define MemSet(start, val, len)
Definition: c.h:949
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1157
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1476
static HTAB * RelationSyncCache
Definition: pgoutput.c:120
#define HASH_BLOBS
Definition: hsearch.h:86
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:326
Size keysize
Definition: hsearch.h:71
#define Assert(condition)
Definition: c.h:745

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 716 of file pgoutput.c.

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

717 {
718  List *result = NIL;
719  ListCell *lc;
720 
721  foreach(lc, pubnames)
722  {
723  char *pubname = (char *) lfirst(lc);
724  Publication *pub = GetPublicationByName(pubname, false);
725 
726  result = lappend(result, pub);
727  }
728 
729  return result;
730 }
#define NIL
Definition: pg_list.h:65
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * lappend(List *list, void *datum)
Definition: list.c:321
#define lfirst(lc)
Definition: pg_list.h:189
Definition: pg_list.h:50

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 385 of file pgoutput.c.

References CacheMemoryContext, convert_tuples_by_name(), CreateTupleDescCopy(), get_schema_sent_in_streamed_txn(), in_streaming, InvalidTransactionId, RelationSyncEntry::map, MemoryContextSwitchTo(), RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferTXN::toptxn, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

388 {
389  bool schema_sent;
392 
393  /*
394  * Remember XID of the (sub)transaction for the change. We don't care if
395  * it's top-level transaction or not (we have already sent that XID in
396  * start of the current streaming block).
397  *
398  * If we're not in a streaming block, just use InvalidTransactionId and
399  * the write methods will not include it.
400  */
401  if (in_streaming)
402  xid = change->txn->xid;
403 
404  if (change->txn->toptxn)
405  topxid = change->txn->toptxn->xid;
406  else
407  topxid = xid;
408 
409  /*
410  * Do we need to send the schema? We do track streamed transactions
411  * separately, because those may be applied later (and the regular
412  * transactions won't see their effects until then) and in an order that
413  * we don't know at this point.
414  *
415  * XXX There is a scope of optimization here. Currently, we always send
416  * the schema first time in a streaming transaction but we can probably
417  * avoid that by checking 'relentry->schema_sent' flag. However, before
418  * doing that we need to study its impact on the case where we have a mix
419  * of streaming and non-streaming transactions.
420  */
421  if (in_streaming)
422  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
423  else
424  schema_sent = relentry->schema_sent;
425 
426  if (schema_sent)
427  return;
428 
429  /* If needed, send the ancestor's schema first. */
430  if (relentry->publish_as_relid != RelationGetRelid(relation))
431  {
432  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
433  TupleDesc indesc = RelationGetDescr(relation);
434  TupleDesc outdesc = RelationGetDescr(ancestor);
435  MemoryContext oldctx;
436 
437  /* Map must live as long as the session does. */
439  relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
440  CreateTupleDescCopy(outdesc));
441  MemoryContextSwitchTo(oldctx);
442  send_relation_and_attrs(ancestor, xid, ctx);
443  RelationClose(ancestor);
444  }
445 
446  send_relation_and_attrs(relation, xid, ctx);
447 
448  if (in_streaming)
449  set_schema_sent_in_streamed_txn(relentry, topxid);
450  else
451  relentry->schema_sent = true;
452 }
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:110
static bool in_streaming
Definition: pgoutput.c:62
uint32 TransactionId
Definition: c.h:520
#define RelationGetDescr(relation)
Definition: rel.h:482
TupleConversionMap * map
Definition: pgoutput.c:116
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx)
Definition: pgoutput.c:458
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
#define InvalidTransactionId
Definition: transam.h:31
struct ReorderBufferTXN * toptxn
void RelationClose(Relation relation)
Definition: relcache.c:2110
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:899
TransactionId xid
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:917
#define RelationGetRelid(relation)
Definition: rel.h:456
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2004
MemoryContext CacheMemoryContext
Definition: mcxt.c:47

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
uint32 protocol_version,
List **  publication_names,
bool binary,
bool enable_streaming 
)
static

Definition at line 159 of file pgoutput.c.

References DefElem::arg, Assert, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, PG_UINT32_MAX, scanint8(), SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

162 {
163  ListCell *lc;
164  bool protocol_version_given = false;
165  bool publication_names_given = false;
166  bool binary_option_given = false;
167  bool streaming_given = false;
168 
169  *binary = false;
170 
171  foreach(lc, options)
172  {
173  DefElem *defel = (DefElem *) lfirst(lc);
174 
175  Assert(defel->arg == NULL || IsA(defel->arg, String));
176 
177  /* Check each param, whether or not we recognize it */
178  if (strcmp(defel->defname, "proto_version") == 0)
179  {
180  int64 parsed;
181 
182  if (protocol_version_given)
183  ereport(ERROR,
184  (errcode(ERRCODE_SYNTAX_ERROR),
185  errmsg("conflicting or redundant options")));
186  protocol_version_given = true;
187 
188  if (!scanint8(strVal(defel->arg), true, &parsed))
189  ereport(ERROR,
190  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191  errmsg("invalid proto_version")));
192 
193  if (parsed > PG_UINT32_MAX || parsed < 0)
194  ereport(ERROR,
195  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
196  errmsg("proto_version \"%s\" out of range",
197  strVal(defel->arg))));
198 
199  *protocol_version = (uint32) parsed;
200  }
201  else if (strcmp(defel->defname, "publication_names") == 0)
202  {
203  if (publication_names_given)
204  ereport(ERROR,
205  (errcode(ERRCODE_SYNTAX_ERROR),
206  errmsg("conflicting or redundant options")));
207  publication_names_given = true;
208 
209  if (!SplitIdentifierString(strVal(defel->arg), ',',
210  publication_names))
211  ereport(ERROR,
212  (errcode(ERRCODE_INVALID_NAME),
213  errmsg("invalid publication_names syntax")));
214  }
215  else if (strcmp(defel->defname, "binary") == 0)
216  {
217  if (binary_option_given)
218  ereport(ERROR,
219  (errcode(ERRCODE_SYNTAX_ERROR),
220  errmsg("conflicting or redundant options")));
221  binary_option_given = true;
222 
223  *binary = defGetBoolean(defel);
224  }
225  else if (strcmp(defel->defname, "streaming") == 0)
226  {
227  if (streaming_given)
228  ereport(ERROR,
229  (errcode(ERRCODE_SYNTAX_ERROR),
230  errmsg("conflicting or redundant options")));
231  streaming_given = true;
232 
233  *enable_streaming = defGetBoolean(defel);
234  }
235  else
236  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
237  }
238 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:579
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:610
#define PG_UINT32_MAX
Definition: c.h:458
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3702
unsigned int uint32
Definition: c.h:374
Node * arg
Definition: parsenodes.h:734
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:189
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
char * defname
Definition: parsenodes.h:733
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 334 of file pgoutput.c.

References InvalidRepOriginId, logicalrep_write_begin(), logicalrep_write_origin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by _PG_output_plugin_init().

335 {
336  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
337 
338  OutputPluginPrepareWrite(ctx, !send_replication_origin);
339  logicalrep_write_begin(ctx->out, txn);
340 
341  if (send_replication_origin)
342  {
343  char *origin;
344 
345  /* Message boundary */
346  OutputPluginWrite(ctx, false);
347  OutputPluginPrepareWrite(ctx, true);
348 
349  /*----------
350  * XXX: which behaviour do we want here?
351  *
352  * Alternatives:
353  * - don't send origin message if origin name not found
354  * (that's what we do now)
355  * - throw error - that will break replication, not good
356  * - send some special "unknown" origin
357  *----------
358  */
359  if (replorigin_by_oid(txn->origin_id, true, &origin))
360  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
361  }
362 
363  OutputPluginWrite(ctx, true);
364 }
RepOriginId origin_id
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:112
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:432
XLogRecPtr origin_lsn
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:45
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
StringInfo out
Definition: logical.h:70

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 498 of file pgoutput.c.

References ReorderBufferChange::action, Assert, PGOutputData::binary, PGOutputData::context, ReorderBufferChange::data, DEBUG1, elog, execute_attr_map_tuple(), get_rel_sync_entry(), in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationData::rd_rel, RelationGetRelid, RelationIdGetRelation(), REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

500 {
502  MemoryContext old;
503  RelationSyncEntry *relentry;
505 
506  if (!is_publishable_relation(relation))
507  return;
508 
509  /*
510  * Remember the xid for the change in streaming mode. We need to send xid
511  * with each change in the streaming mode so that subscriber can make
512  * their association and on aborts, it can discard the corresponding
513  * changes.
514  */
515  if (in_streaming)
516  xid = change->txn->xid;
517 
518  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
519 
520  /* First check the table filter */
521  switch (change->action)
522  {
524  if (!relentry->pubactions.pubinsert)
525  return;
526  break;
528  if (!relentry->pubactions.pubupdate)
529  return;
530  break;
532  if (!relentry->pubactions.pubdelete)
533  return;
534  break;
535  default:
536  Assert(false);
537  }
538 
539  /* Avoid leaking memory by using and resetting our own context */
540  old = MemoryContextSwitchTo(data->context);
541 
542  maybe_send_schema(ctx, txn, change, relation, relentry);
543 
544  /* Send the data */
545  switch (change->action)
546  {
548  {
549  HeapTuple tuple = &change->data.tp.newtuple->tuple;
550 
551  /* Switch relation if publishing via root. */
552  if (relentry->publish_as_relid != RelationGetRelid(relation))
553  {
554  Assert(relation->rd_rel->relispartition);
555  relation = RelationIdGetRelation(relentry->publish_as_relid);
556  /* Convert tuple if needed. */
557  if (relentry->map)
558  tuple = execute_attr_map_tuple(tuple, relentry->map);
559  }
560 
561  OutputPluginPrepareWrite(ctx, true);
562  logicalrep_write_insert(ctx->out, xid, relation, tuple,
563  data->binary);
564  OutputPluginWrite(ctx, true);
565  break;
566  }
568  {
569  HeapTuple oldtuple = change->data.tp.oldtuple ?
570  &change->data.tp.oldtuple->tuple : NULL;
571  HeapTuple newtuple = &change->data.tp.newtuple->tuple;
572 
573  /* Switch relation if publishing via root. */
574  if (relentry->publish_as_relid != RelationGetRelid(relation))
575  {
576  Assert(relation->rd_rel->relispartition);
577  relation = RelationIdGetRelation(relentry->publish_as_relid);
578  /* Convert tuples if needed. */
579  if (relentry->map)
580  {
581  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
582  newtuple = execute_attr_map_tuple(newtuple, relentry->map);
583  }
584  }
585 
586  OutputPluginPrepareWrite(ctx, true);
587  logicalrep_write_update(ctx->out, xid, relation, oldtuple,
588  newtuple, data->binary);
589  OutputPluginWrite(ctx, true);
590  break;
591  }
593  if (change->data.tp.oldtuple)
594  {
595  HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
596 
597  /* Switch relation if publishing via root. */
598  if (relentry->publish_as_relid != RelationGetRelid(relation))
599  {
600  Assert(relation->rd_rel->relispartition);
601  relation = RelationIdGetRelation(relentry->publish_as_relid);
602  /* Convert tuple if needed. */
603  if (relentry->map)
604  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
605  }
606 
607  OutputPluginPrepareWrite(ctx, true);
608  logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
609  data->binary);
610  OutputPluginWrite(ctx, true);
611  }
612  else
613  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
614  break;
615  default:
616  Assert(false);
617  }
618 
619  /* Cleanup */
622 }
#define DEBUG1
Definition: elog.h:25
static bool in_streaming
Definition: pgoutput.c:62
uint32 TransactionId
Definition: c.h:520
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:385
MemoryContext context
Definition: pgoutput.h:20
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:259
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:141
bool is_publishable_relation(Relation rel)
#define InvalidTransactionId
Definition: transam.h:31
HeapTuple execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map)
Definition: tupconvert.c:139
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:938
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
#define Assert(condition)
Definition: c.h:745
bool binary
Definition: pgoutput.h:27
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
#define elog(elevel,...)
Definition: elog.h:214
StringInfo out
Definition: logical.h:70
struct ReorderBufferChange::@98::@99 tp
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:185
#define RelationGetRelid(relation)
Definition: rel.h:456
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2004

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 370 of file pgoutput.c.

References logicalrep_write_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

372 {
374 
375  OutputPluginPrepareWrite(ctx, true);
376  logicalrep_write_commit(ctx->out, txn, commit_lsn);
377  OutputPluginWrite(ctx, true);
378 }
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:74
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:610
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
StringInfo out
Definition: logical.h:70

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 690 of file pgoutput.c.

Referenced by _PG_output_plugin_init().

692 {
693  return false;
694 }

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 703 of file pgoutput.c.

References hash_destroy().

Referenced by _PG_output_plugin_init().

704 {
705  if (RelationSyncCache)
706  {
708  RelationSyncCache = NULL;
709  }
710 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:827
static HTAB * RelationSyncCache
Definition: pgoutput.c:120

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 244 of file pgoutput.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, PGOutputData::binary, CacheMemoryContext, CacheRegisterSyscacheCallback(), PGOutputData::context, LogicalDecodingContext::context, ereport, errcode(), errmsg(), ERROR, in_streaming, init_rel_sync_cache(), list_length(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), PGOutputData::protocol_version, publication_invalidation_cb(), PGOutputData::publication_names, PUBLICATIONOID, PGOutputData::publications, publications_valid, and LogicalDecodingContext::streaming.

Referenced by _PG_output_plugin_init().

246 {
247  bool enable_streaming = false;
248  PGOutputData *data = palloc0(sizeof(PGOutputData));
249 
250  /* Create our memory context for private allocations. */
252  "logical replication output context",
254 
255  ctx->output_plugin_private = data;
256 
257  /* This plugin uses binary protocol. */
259 
260  /*
261  * This is replication start and not slot initialization.
262  *
263  * Parse and validate options passed by the client.
264  */
265  if (!is_init)
266  {
267  /* Parse the params and ERROR if we see any we don't recognize */
269  &data->protocol_version,
270  &data->publication_names,
271  &data->binary,
272  &enable_streaming);
273 
274  /* Check if we support requested protocol */
276  ereport(ERROR,
277  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
278  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
280 
282  ereport(ERROR,
283  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
284  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
286 
287  if (list_length(data->publication_names) < 1)
288  ereport(ERROR,
289  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
290  errmsg("publication_names parameter missing")));
291 
292  /*
293  * Decide whether to enable streaming. It is disabled by default, in
294  * which case we just update the flag in decoding context. Otherwise
295  * we only allow it with sufficient version of the protocol, and when
296  * the output plugin supports it.
297  */
298  if (!enable_streaming)
299  ctx->streaming = false;
301  ereport(ERROR,
302  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
303  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
305  else if (!ctx->streaming)
306  ereport(ERROR,
307  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
308  errmsg("streaming requested, but not supported by output plugin")));
309 
310  /* Also remember we're currently not streaming any transaction. */
311  in_streaming = false;
312 
313  /* Init publication state. */
314  data->publications = NIL;
315  publications_valid = false;
318  (Datum) 0);
319 
320  /* Initialize relation schema cache. */
322  }
323  else
324  {
325  /* Disable the streaming during the slot initialization mode. */
326  ctx->streaming = false;
327  }
328 }
#define NIL
Definition: pg_list.h:65
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names, bool *binary, bool *enable_streaming)
Definition: pgoutput.c:159
#define AllocSetContextCreate
Definition: memutils.h:170
static bool publications_valid
Definition: pgoutput.c:61
static bool in_streaming
Definition: pgoutput.c:62
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:736
MemoryContext context
Definition: pgoutput.h:20
int errcode(int sqlerrcode)
Definition: elog.c:610
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
List * output_plugin_options
Definition: logical.h:58
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:31
List * publication_names
Definition: pgoutput.h:25
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:34
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
void * palloc0(Size size)
Definition: mcxt.c:981
uintptr_t Datum
Definition: postgres.h:367
#define ereport(elevel,...)
Definition: elog.h:144
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
bool binary
Definition: pgoutput.h:27
static int list_length(const List *l)
Definition: pg_list.h:169
List * publications
Definition: pgoutput.h:26
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:867
int errmsg(const char *fmt,...)
Definition: elog.c:824
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
uint32 protocol_version
Definition: pgoutput.h:24

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 810 of file pgoutput.c.

References Assert, cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_abort(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, ReorderBufferTXN::toptxn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

813 {
814  ReorderBufferTXN *toptxn;
815 
816  /*
817  * The abort should happen outside streaming block, even for streamed
818  * transactions. The transaction has to be marked as streamed, though.
819  */
821 
822  /* determine the toplevel transaction */
823  toptxn = (txn->toptxn) ? txn->toptxn : txn;
824 
825  Assert(rbtxn_is_streamed(toptxn));
826 
827  OutputPluginPrepareWrite(ctx, true);
828  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
829  OutputPluginWrite(ctx, true);
830 
831  cleanup_rel_sync_cache(toptxn->xid, false);
832 }
static bool in_streaming
Definition: pgoutput.c:62
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:849
#define rbtxn_is_streamed(txn)
struct ReorderBufferTXN * toptxn
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
#define Assert(condition)
Definition: c.h:745
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:1079
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
StringInfo out
Definition: logical.h:70

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 839 of file pgoutput.c.

References Assert, cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

842 {
843  /*
844  * The commit should happen outside streaming block, even for streamed
845  * transactions. The transaction has to be marked as streamed, though.
846  */
849 
851 
852  OutputPluginPrepareWrite(ctx, true);
853  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
854  OutputPluginWrite(ctx, true);
855 
856  cleanup_rel_sync_cache(txn->xid, true);
857 }
static bool in_streaming
Definition: pgoutput.c:62
#define rbtxn_is_streamed(txn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:798
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:610
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
#define Assert(condition)
Definition: c.h:745
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:1079
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
StringInfo out
Definition: logical.h:70

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 751 of file pgoutput.c.

References Assert, in_streaming, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_origin(), logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, replorigin_by_oid(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

753 {
754  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
755 
756  /* we can't nest streaming of transactions */
758 
759  /*
760  * If we already sent the first stream for this transaction then don't
761  * send the origin id in the subsequent streams.
762  */
763  if (rbtxn_is_streamed(txn))
764  send_replication_origin = false;
765 
766  OutputPluginPrepareWrite(ctx, !send_replication_origin);
768 
769  if (send_replication_origin)
770  {
771  char *origin;
772 
773  /* Message boundary */
774  OutputPluginWrite(ctx, false);
775  OutputPluginPrepareWrite(ctx, true);
776 
777  if (replorigin_by_oid(txn->origin_id, true, &origin))
779  }
780 
781  OutputPluginWrite(ctx, true);
782 
783  /* we're streaming a chunk of transaction now */
784  in_streaming = true;
785 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
static bool in_streaming
Definition: pgoutput.c:62
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:112
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:432
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:755
#define rbtxn_is_streamed(txn)
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
#define Assert(condition)
Definition: c.h:745
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
StringInfo out
Definition: logical.h:70

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 791 of file pgoutput.c.

References Assert, in_streaming, logicalrep_write_stream_stop(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

793 {
794  /* we should be streaming a trasanction */
796 
797  OutputPluginPrepareWrite(ctx, true);
799  OutputPluginWrite(ctx, true);
800 
801  /* we've stopped streaming a transaction */
802  in_streaming = false;
803 }
static bool in_streaming
Definition: pgoutput.c:62
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:789
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
#define Assert(condition)
Definition: c.h:745
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
StringInfo out
Definition: logical.h:70

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 625 of file pgoutput.c.

References PGOutputData::context, ReorderBufferChange::data, get_rel_sync_entry(), i, in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), RelationData::rd_rel, RelationGetRelid, RelationSyncEntry::relid, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

627 {
629  MemoryContext old;
630  RelationSyncEntry *relentry;
631  int i;
632  int nrelids;
633  Oid *relids;
635 
636  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
637  if (in_streaming)
638  xid = change->txn->xid;
639 
640  old = MemoryContextSwitchTo(data->context);
641 
642  relids = palloc0(nrelations * sizeof(Oid));
643  nrelids = 0;
644 
645  for (i = 0; i < nrelations; i++)
646  {
647  Relation relation = relations[i];
648  Oid relid = RelationGetRelid(relation);
649 
650  if (!is_publishable_relation(relation))
651  continue;
652 
653  relentry = get_rel_sync_entry(data, relid);
654 
655  if (!relentry->pubactions.pubtruncate)
656  continue;
657 
658  /*
659  * Don't send partitions if the publication wants to send only the
660  * root tables through it.
661  */
662  if (relation->rd_rel->relispartition &&
663  relentry->publish_as_relid != relid)
664  continue;
665 
666  relids[nrelids++] = relid;
667  maybe_send_schema(ctx, txn, change, relation, relentry);
668  }
669 
670  if (nrelids > 0)
671  {
672  OutputPluginPrepareWrite(ctx, true);
674  xid,
675  nrelids,
676  relids,
677  change->data.truncate.cascade,
678  change->data.truncate.restart_seqs);
679  OutputPluginWrite(ctx, true);
680  }
681 
684 }
static bool in_streaming
Definition: pgoutput.c:62
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:311
uint32 TransactionId
Definition: c.h:520
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:385
MemoryContext context
Definition: pgoutput.h:20
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:86
union ReorderBufferChange::@98 data
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:137
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
bool is_publishable_relation(Relation rel)
#define InvalidTransactionId
Definition: transam.h:31
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:938
void * palloc0(Size size)
Definition: mcxt.c:981
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
struct ReorderBufferChange::@98::@100 truncate
StringInfo out
Definition: logical.h:70
int i
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 736 of file pgoutput.c.

References publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

737 {
738  publications_valid = false;
739 
740  /*
741  * Also invalidate per-relation cache so that next time the filtering info
742  * is checked it will be updated with the new publication settings.
743  */
744  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
745 }
static bool publications_valid
Definition: pgoutput.c:61
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1157
void * arg

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1157 of file pgoutput.c.

References hash_seq_init(), hash_seq_search(), RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

1158 {
1160  RelationSyncEntry *entry;
1161 
1162  /*
1163  * We can get here if the plugin was used in SQL interface as the
1164  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1165  * is no way to unregister the relcache invalidation callback.
1166  */
1167  if (RelationSyncCache == NULL)
1168  return;
1169 
1170  /*
1171  * There is no way to find which entry in our cache the hash belongs to so
1172  * mark the whole cache as invalid.
1173  */
1174  hash_seq_init(&status, RelationSyncCache);
1175  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
1176  entry->replicate_valid = false;
1177 }
bool replicate_valid
Definition: pgoutput.c:99
static HTAB * RelationSyncCache
Definition: pgoutput.c:120
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1401
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1391
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 1115 of file pgoutput.c.

References HASH_FIND, hash_search(), list_free(), NIL, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by init_rel_sync_cache().

1116 {
1117  RelationSyncEntry *entry;
1118 
1119  /*
1120  * We can get here if the plugin was used in SQL interface as the
1121  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1122  * is no way to unregister the relcache invalidation callback.
1123  */
1124  if (RelationSyncCache == NULL)
1125  return;
1126 
1127  /*
1128  * Nobody keeps pointers to entries in this hash table around outside
1129  * logical decoding callback calls - but invalidation events can come in
1130  * *during* a callback if we access the relcache in the callback. Because
1131  * of that we must mark the cache entry as invalid but not remove it from
1132  * the hash while it could still be referenced, then prune it at a later
1133  * safe point.
1134  *
1135  * Getting invalidations for relations that aren't in the table is
1136  * entirely normal, since there's no way to unregister for an invalidation
1137  * event. So we don't care if it's found or not.
1138  */
1139  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1140  HASH_FIND, NULL);
1141 
1142  /*
1143  * Reset schema sent status as the relation definition may have changed.
1144  */
1145  if (entry != NULL)
1146  {
1147  entry->schema_sent = false;
1148  list_free(entry->streamed_txns);
1149  entry->streamed_txns = NIL;
1150  }
1151 }
List * streamed_txns
Definition: pgoutput.c:96
#define NIL
Definition: pg_list.h:65
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:919
static HTAB * RelationSyncCache
Definition: pgoutput.c:120
void list_free(List *list)
Definition: list.c:1376

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx 
)
static

Definition at line 458 of file pgoutput.c.

References FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

460 {
461  TupleDesc desc = RelationGetDescr(relation);
462  int i;
463 
464  /*
465  * Write out type info if needed. We do that only for user-created types.
466  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
467  * objects with hand-assigned OIDs to be "built in", not for instance any
468  * function or type defined in the information_schema. This is important
469  * because only hand-assigned OIDs can be expected to remain stable across
470  * major versions.
471  */
472  for (i = 0; i < desc->natts; i++)
473  {
474  Form_pg_attribute att = TupleDescAttr(desc, i);
475 
476  if (att->attisdropped || att->attgenerated)
477  continue;
478 
479  if (att->atttypid < FirstGenbkiObjectId)
480  continue;
481 
482  OutputPluginPrepareWrite(ctx, false);
483  logicalrep_write_typ(ctx->out, xid, att->atttypid);
484  OutputPluginWrite(ctx, false);
485  }
486 
487  OutputPluginPrepareWrite(ctx, false);
488  logicalrep_write_rel(ctx->out, xid, relation);
489  OutputPluginWrite(ctx, false);
490 }
#define RelationGetDescr(relation)
Definition: rel.h:482
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
Definition: proto.c:368
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:584
#define FirstGenbkiObjectId
Definition: transam.h:188
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:597
StringInfo out
Definition: logical.h:70
int i
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:422

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 917 of file pgoutput.c.

References CacheMemoryContext, lappend_int(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

918 {
919  MemoryContext oldctx;
920 
922 
923  entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
924 
925  MemoryContextSwitchTo(oldctx);
926 }
List * streamed_txns
Definition: pgoutput.c:96
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
List * lappend_int(List *list, int datum)
Definition: list.c:339
MemoryContext CacheMemoryContext
Definition: mcxt.c:47

Variable Documentation

◆ in_streaming

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 31 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 61 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache

HTAB* RelationSyncCache = NULL
static

Definition at line 120 of file pgoutput.c.