PostgreSQL Source Code  git master
pgoutput.c File Reference
#include "postgres.h"
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "commands/defrem.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/int8.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 

Functions

void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx)
 
static void init_rel_sync_cache (MemoryContext decoding_context)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Oid relid)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static bool in_streaming
 
static HTABRelationSyncCache = NULL
 

Typedef Documentation

◆ RelationSyncEntry

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 141 of file pgoutput.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

142 {
144 
153 
154  /* transaction streaming */
162 }
LogicalDecodeTruncateCB truncate_cb
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:512
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:141
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:804
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:756
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:384
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamMessageCB stream_message_cb
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:711
LogicalDecodeStreamAbortCB stream_abort_cb
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:893
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:261
LogicalDecodeChangeCB change_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:743
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeStreamStartCB stream_start_cb
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:845
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamStopCB stream_stop_cb
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:649
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:346
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeStreamChangeCB stream_change_cb
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:864
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:963

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 1129 of file pgoutput.c.

References Assert, foreach_delete_current, hash_seq_init(), hash_seq_search(), lfirst_int, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

1130 {
1131  HASH_SEQ_STATUS hash_seq;
1132  RelationSyncEntry *entry;
1133  ListCell *lc;
1134 
1135  Assert(RelationSyncCache != NULL);
1136 
1137  hash_seq_init(&hash_seq, RelationSyncCache);
1138  while ((entry = hash_seq_search(&hash_seq)) != NULL)
1139  {
1140  /*
1141  * We can set the schema_sent flag for an entry that has committed xid
1142  * in the list as that ensures that the subscriber would have the
1143  * corresponding schema and we don't need to send it unless there is
1144  * any invalidation for that relation.
1145  */
1146  foreach(lc, entry->streamed_txns)
1147  {
1148  if (xid == (uint32) lfirst_int(lc))
1149  {
1150  if (is_commit)
1151  entry->schema_sent = true;
1152 
1153  entry->streamed_txns =
1155  break;
1156  }
1157  }
1158  }
1159 }
List * streamed_txns
Definition: pgoutput.c:100
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:369
#define lfirst_int(lc)
Definition: pg_list.h:170
unsigned int uint32
Definition: c.h:441
static HTAB * RelationSyncCache
Definition: pgoutput.c:124
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Oid  relid 
)
static

Definition at line 988 of file pgoutput.c.

References Publication::alltables, Assert, CacheMemoryContext, get_partition_ancestors(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), HASH_ENTER, hash_search(), InvalidOid, lfirst, lfirst_oid, list_free(), list_free_deep(), list_member_oid(), llast_oid, LoadPublications(), MemoryContextSwitchTo(), NIL, Publication::oid, Publication::pubactions, RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PGOutputData::publication_names, PGOutputData::publications, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationSyncEntry::relid, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_change(), and pgoutput_truncate().

989 {
990  RelationSyncEntry *entry;
991  bool am_partition = get_rel_relispartition(relid);
992  char relkind = get_rel_relkind(relid);
993  bool found;
994  MemoryContext oldctx;
995 
996  Assert(RelationSyncCache != NULL);
997 
998  /* Find cached relation info, creating if not found */
1000  (void *) &relid,
1001  HASH_ENTER, &found);
1002  Assert(entry != NULL);
1003 
1004  /* Not found means schema wasn't sent */
1005  if (!found)
1006  {
1007  /* immediately make a new entry valid enough to satisfy callbacks */
1008  entry->schema_sent = false;
1009  entry->streamed_txns = NIL;
1010  entry->replicate_valid = false;
1011  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1012  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1013  entry->publish_as_relid = InvalidOid;
1014  }
1015 
1016  /* Validate the entry */
1017  if (!entry->replicate_valid)
1018  {
1019  List *pubids = GetRelationPublications(relid);
1020  ListCell *lc;
1021  Oid publish_as_relid = relid;
1022 
1023  /* Reload publications if needed before use. */
1024  if (!publications_valid)
1025  {
1027  if (data->publications)
1029 
1031  MemoryContextSwitchTo(oldctx);
1032  publications_valid = true;
1033  }
1034 
1035  /*
1036  * Build publication cache. We can't use one provided by relcache as
1037  * relcache considers all publications given relation is in, but here
1038  * we only need to consider ones that the subscriber requested.
1039  */
1040  foreach(lc, data->publications)
1041  {
1042  Publication *pub = lfirst(lc);
1043  bool publish = false;
1044 
1045  if (pub->alltables)
1046  {
1047  publish = true;
1048  if (pub->pubviaroot && am_partition)
1049  publish_as_relid = llast_oid(get_partition_ancestors(relid));
1050  }
1051 
1052  if (!publish)
1053  {
1054  bool ancestor_published = false;
1055 
1056  /*
1057  * For a partition, check if any of the ancestors are
1058  * published. If so, note down the topmost ancestor that is
1059  * published via this publication, which will be used as the
1060  * relation via which to publish the partition's changes.
1061  */
1062  if (am_partition)
1063  {
1064  List *ancestors = get_partition_ancestors(relid);
1065  ListCell *lc2;
1066 
1067  /*
1068  * Find the "topmost" ancestor that is in this
1069  * publication.
1070  */
1071  foreach(lc2, ancestors)
1072  {
1073  Oid ancestor = lfirst_oid(lc2);
1074 
1076  pub->oid))
1077  {
1078  ancestor_published = true;
1079  if (pub->pubviaroot)
1080  publish_as_relid = ancestor;
1081  }
1082  }
1083  }
1084 
1085  if (list_member_oid(pubids, pub->oid) || ancestor_published)
1086  publish = true;
1087  }
1088 
1089  /*
1090  * Don't publish changes for partitioned tables, because
1091  * publishing those of its partitions suffices, unless partition
1092  * changes won't be published due to pubviaroot being set.
1093  */
1094  if (publish &&
1095  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
1096  {
1097  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
1098  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
1099  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
1101  }
1102 
1103  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
1104  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
1105  break;
1106  }
1107 
1108  list_free(pubids);
1109 
1110  entry->publish_as_relid = publish_as_relid;
1111  entry->replicate_valid = true;
1112  }
1113 
1114  return entry;
1115 }
List * streamed_txns
Definition: pgoutput.c:100
#define NIL
Definition: pg_list.h:65
PublicationActions pubactions
static bool publications_valid
Definition: pgoutput.c:65
bool replicate_valid
Definition: pgoutput.c:103
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1974
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define llast_oid(l)
Definition: pg_list.h:196
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
unsigned int Oid
Definition: postgres_ext.h:31
void list_free_deep(List *list)
Definition: list.c:1405
List * GetRelationPublications(Oid relid)
List * publication_names
Definition: pgoutput.h:25
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:1998
static HTAB * RelationSyncCache
Definition: pgoutput.c:124
#define InvalidOid
Definition: postgres_ext.h:36
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:689
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:769
List * publications
Definition: pgoutput.h:26
void list_free(List *list)
Definition: list.c:1391
Definition: pg_list.h:50
List * get_partition_ancestors(Oid relid)
Definition: partition.c:133
#define lfirst_oid(lc)
Definition: pg_list.h:171
MemoryContext CacheMemoryContext
Definition: mcxt.c:51
PublicationActions pubactions
Definition: pgoutput.c:104

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 949 of file pgoutput.c.

References lfirst_int, and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

950 {
951  ListCell *lc;
952 
953  foreach(lc, entry->streamed_txns)
954  {
955  if (xid == (uint32) lfirst_int(lc))
956  return true;
957  }
958 
959  return false;
960 }
List * streamed_txns
Definition: pgoutput.c:100
#define lfirst_int(lc)
Definition: pg_list.h:170
unsigned int uint32
Definition: c.h:441

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  decoding_context)
static

Definition at line 921 of file pgoutput.c.

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), and rel_sync_cache_relation_cb().

Referenced by pgoutput_startup().

922 {
923  HASHCTL ctl;
924 
925  if (RelationSyncCache != NULL)
926  return;
927 
928  /* Make a new hash table for the cache */
929  ctl.keysize = sizeof(Oid);
930  ctl.entrysize = sizeof(RelationSyncEntry);
931  ctl.hcxt = cachectx;
932 
933  RelationSyncCache = hash_create("logical replication output relation cache",
934  128, &ctl,
936 
937  Assert(RelationSyncCache != NULL);
938 
942  (Datum) 0);
943 }
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:1165
Size entrysize
Definition: hsearch.h:76
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1207
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1476
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
static HTAB * RelationSyncCache
Definition: pgoutput.c:124
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
uintptr_t Datum
Definition: postgres.h:411
Size keysize
Definition: hsearch.h:75
#define Assert(condition)
Definition: c.h:804

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 769 of file pgoutput.c.

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

770 {
771  List *result = NIL;
772  ListCell *lc;
773 
774  foreach(lc, pubnames)
775  {
776  char *pubname = (char *) lfirst(lc);
777  Publication *pub = GetPublicationByName(pubname, false);
778 
779  result = lappend(result, pub);
780  }
781 
782  return result;
783 }
#define NIL
Definition: pg_list.h:65
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * lappend(List *list, void *datum)
Definition: list.c:336
#define lfirst(lc)
Definition: pg_list.h:169
Definition: pg_list.h:50

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 399 of file pgoutput.c.

References CacheMemoryContext, convert_tuples_by_name(), CreateTupleDescCopy(), get_schema_sent_in_streamed_txn(), in_streaming, InvalidTransactionId, RelationSyncEntry::map, MemoryContextSwitchTo(), RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferTXN::toptxn, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

402 {
403  bool schema_sent;
406 
407  /*
408  * Remember XID of the (sub)transaction for the change. We don't care if
409  * it's top-level transaction or not (we have already sent that XID in
410  * start of the current streaming block).
411  *
412  * If we're not in a streaming block, just use InvalidTransactionId and
413  * the write methods will not include it.
414  */
415  if (in_streaming)
416  xid = change->txn->xid;
417 
418  if (change->txn->toptxn)
419  topxid = change->txn->toptxn->xid;
420  else
421  topxid = xid;
422 
423  /*
424  * Do we need to send the schema? We do track streamed transactions
425  * separately, because those may be applied later (and the regular
426  * transactions won't see their effects until then) and in an order that
427  * we don't know at this point.
428  *
429  * XXX There is a scope of optimization here. Currently, we always send
430  * the schema first time in a streaming transaction but we can probably
431  * avoid that by checking 'relentry->schema_sent' flag. However, before
432  * doing that we need to study its impact on the case where we have a mix
433  * of streaming and non-streaming transactions.
434  */
435  if (in_streaming)
436  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
437  else
438  schema_sent = relentry->schema_sent;
439 
440  if (schema_sent)
441  return;
442 
443  /* If needed, send the ancestor's schema first. */
444  if (relentry->publish_as_relid != RelationGetRelid(relation))
445  {
446  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
447  TupleDesc indesc = RelationGetDescr(relation);
448  TupleDesc outdesc = RelationGetDescr(ancestor);
449  MemoryContext oldctx;
450 
451  /* Map must live as long as the session does. */
453  relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
454  CreateTupleDescCopy(outdesc));
455  MemoryContextSwitchTo(oldctx);
456  send_relation_and_attrs(ancestor, xid, ctx);
457  RelationClose(ancestor);
458  }
459 
460  send_relation_and_attrs(relation, xid, ctx);
461 
462  if (in_streaming)
463  set_schema_sent_in_streamed_txn(relentry, topxid);
464  else
465  relentry->schema_sent = true;
466 }
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:111
static bool in_streaming
Definition: pgoutput.c:66
uint32 TransactionId
Definition: c.h:587
#define RelationGetDescr(relation)
Definition: rel.h:483
TupleConversionMap * map
Definition: pgoutput.c:120
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx)
Definition: pgoutput.c:472
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
#define InvalidTransactionId
Definition: transam.h:31
struct ReorderBufferTXN * toptxn
void RelationClose(Relation relation)
Definition: relcache.c:2096
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:949
TransactionId xid
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:967
#define RelationGetRelid(relation)
Definition: rel.h:457
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1990
MemoryContext CacheMemoryContext
Definition: mcxt.c:51

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 165 of file pgoutput.c.

References DefElem::arg, Assert, PGOutputData::binary, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, PGOutputData::messages, PG_UINT32_MAX, PGOutputData::protocol_version, PGOutputData::publication_names, scanint8(), SplitIdentifierString(), PGOutputData::streaming, and strVal.

Referenced by pgoutput_startup().

166 {
167  ListCell *lc;
168  bool protocol_version_given = false;
169  bool publication_names_given = false;
170  bool binary_option_given = false;
171  bool messages_option_given = false;
172  bool streaming_given = false;
173 
174  data->binary = false;
175  data->streaming = false;
176  data->messages = false;
177 
178  foreach(lc, options)
179  {
180  DefElem *defel = (DefElem *) lfirst(lc);
181 
182  Assert(defel->arg == NULL || IsA(defel->arg, String));
183 
184  /* Check each param, whether or not we recognize it */
185  if (strcmp(defel->defname, "proto_version") == 0)
186  {
187  int64 parsed;
188 
189  if (protocol_version_given)
190  ereport(ERROR,
191  (errcode(ERRCODE_SYNTAX_ERROR),
192  errmsg("conflicting or redundant options")));
193  protocol_version_given = true;
194 
195  if (!scanint8(strVal(defel->arg), true, &parsed))
196  ereport(ERROR,
197  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
198  errmsg("invalid proto_version")));
199 
200  if (parsed > PG_UINT32_MAX || parsed < 0)
201  ereport(ERROR,
202  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
203  errmsg("proto_version \"%s\" out of range",
204  strVal(defel->arg))));
205 
206  data->protocol_version = (uint32) parsed;
207  }
208  else if (strcmp(defel->defname, "publication_names") == 0)
209  {
210  if (publication_names_given)
211  ereport(ERROR,
212  (errcode(ERRCODE_SYNTAX_ERROR),
213  errmsg("conflicting or redundant options")));
214  publication_names_given = true;
215 
216  if (!SplitIdentifierString(strVal(defel->arg), ',',
217  &data->publication_names))
218  ereport(ERROR,
219  (errcode(ERRCODE_INVALID_NAME),
220  errmsg("invalid publication_names syntax")));
221  }
222  else if (strcmp(defel->defname, "binary") == 0)
223  {
224  if (binary_option_given)
225  ereport(ERROR,
226  (errcode(ERRCODE_SYNTAX_ERROR),
227  errmsg("conflicting or redundant options")));
228  binary_option_given = true;
229 
230  data->binary = defGetBoolean(defel);
231  }
232  else if (strcmp(defel->defname, "messages") == 0)
233  {
234  if (messages_option_given)
235  ereport(ERROR,
236  (errcode(ERRCODE_SYNTAX_ERROR),
237  errmsg("conflicting or redundant options")));
238  messages_option_given = true;
239 
240  data->messages = defGetBoolean(defel);
241  }
242  else if (strcmp(defel->defname, "streaming") == 0)
243  {
244  if (streaming_given)
245  ereport(ERROR,
246  (errcode(ERRCODE_SYNTAX_ERROR),
247  errmsg("conflicting or redundant options")));
248  streaming_given = true;
249 
250  data->streaming = defGetBoolean(defel);
251  }
252  else
253  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
254  }
255 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:590
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:698
#define PG_UINT32_MAX
Definition: c.h:525
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:46
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3753
bool messages
Definition: pgoutput.h:29
List * publication_names
Definition: pgoutput.h:25
unsigned int uint32
Definition: c.h:441
Node * arg
Definition: parsenodes.h:747
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
bool binary
Definition: pgoutput.h:27
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
char * defname
Definition: parsenodes.h:746
bool streaming
Definition: pgoutput.h:28
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55
uint32 protocol_version
Definition: pgoutput.h:24

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 346 of file pgoutput.c.

References InvalidRepOriginId, logicalrep_write_begin(), logicalrep_write_origin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by _PG_output_plugin_init().

347 {
348  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
349 
350  OutputPluginPrepareWrite(ctx, !send_replication_origin);
351  logicalrep_write_begin(ctx->out, txn);
352 
353  if (send_replication_origin)
354  {
355  char *origin;
356 
357  /*----------
358  * XXX: which behaviour do we want here?
359  *
360  * Alternatives:
361  * - don't send origin message if origin name not found
362  * (that's what we do now)
363  * - throw error - that will break replication, not good
364  * - send some special "unknown" origin
365  *----------
366  */
367  if (replorigin_by_oid(txn->origin_id, true, &origin))
368  {
369  /* Message boundary */
370  OutputPluginWrite(ctx, false);
371  OutputPluginPrepareWrite(ctx, true);
372  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
373  }
374 
375  }
376 
377  OutputPluginWrite(ctx, true);
378 }
RepOriginId origin_id
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:449
XLogRecPtr origin_lsn
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 512 of file pgoutput.c.

References ReorderBufferChange::action, Assert, PGOutputData::binary, PGOutputData::context, ReorderBufferChange::data, DEBUG1, elog, execute_attr_map_tuple(), get_rel_sync_entry(), in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationData::rd_rel, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

514 {
516  MemoryContext old;
517  RelationSyncEntry *relentry;
519  Relation ancestor = NULL;
520 
521  if (!is_publishable_relation(relation))
522  return;
523 
524  /*
525  * Remember the xid for the change in streaming mode. We need to send xid
526  * with each change in the streaming mode so that subscriber can make
527  * their association and on aborts, it can discard the corresponding
528  * changes.
529  */
530  if (in_streaming)
531  xid = change->txn->xid;
532 
533  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
534 
535  /* First check the table filter */
536  switch (change->action)
537  {
539  if (!relentry->pubactions.pubinsert)
540  return;
541  break;
543  if (!relentry->pubactions.pubupdate)
544  return;
545  break;
547  if (!relentry->pubactions.pubdelete)
548  return;
549  break;
550  default:
551  Assert(false);
552  }
553 
554  /* Avoid leaking memory by using and resetting our own context */
555  old = MemoryContextSwitchTo(data->context);
556 
557  maybe_send_schema(ctx, txn, change, relation, relentry);
558 
559  /* Send the data */
560  switch (change->action)
561  {
563  {
564  HeapTuple tuple = &change->data.tp.newtuple->tuple;
565 
566  /* Switch relation if publishing via root. */
567  if (relentry->publish_as_relid != RelationGetRelid(relation))
568  {
569  Assert(relation->rd_rel->relispartition);
570  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
571  relation = ancestor;
572  /* Convert tuple if needed. */
573  if (relentry->map)
574  tuple = execute_attr_map_tuple(tuple, relentry->map);
575  }
576 
577  OutputPluginPrepareWrite(ctx, true);
578  logicalrep_write_insert(ctx->out, xid, relation, tuple,
579  data->binary);
580  OutputPluginWrite(ctx, true);
581  break;
582  }
584  {
585  HeapTuple oldtuple = change->data.tp.oldtuple ?
586  &change->data.tp.oldtuple->tuple : NULL;
587  HeapTuple newtuple = &change->data.tp.newtuple->tuple;
588 
589  /* Switch relation if publishing via root. */
590  if (relentry->publish_as_relid != RelationGetRelid(relation))
591  {
592  Assert(relation->rd_rel->relispartition);
593  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
594  relation = ancestor;
595  /* Convert tuples if needed. */
596  if (relentry->map)
597  {
598  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
599  newtuple = execute_attr_map_tuple(newtuple, relentry->map);
600  }
601  }
602 
603  OutputPluginPrepareWrite(ctx, true);
604  logicalrep_write_update(ctx->out, xid, relation, oldtuple,
605  newtuple, data->binary);
606  OutputPluginWrite(ctx, true);
607  break;
608  }
610  if (change->data.tp.oldtuple)
611  {
612  HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
613 
614  /* Switch relation if publishing via root. */
615  if (relentry->publish_as_relid != RelationGetRelid(relation))
616  {
617  Assert(relation->rd_rel->relispartition);
618  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
619  relation = ancestor;
620  /* Convert tuple if needed. */
621  if (relentry->map)
622  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
623  }
624 
625  OutputPluginPrepareWrite(ctx, true);
626  logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
627  data->binary);
628  OutputPluginWrite(ctx, true);
629  }
630  else
631  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
632  break;
633  default:
634  Assert(false);
635  }
636 
637  if (RelationIsValid(ancestor))
638  {
639  RelationClose(ancestor);
640  ancestor = NULL;
641  }
642 
643  /* Cleanup */
646 }
#define DEBUG1
Definition: elog.h:25
static bool in_streaming
Definition: pgoutput.c:66
uint32 TransactionId
Definition: c.h:587
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:399
MemoryContext context
Definition: pgoutput.h:20
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:78
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:260
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:84
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:142
#define RelationIsValid(relation)
Definition: rel.h:430
bool is_publishable_relation(Relation rel)
#define InvalidTransactionId
Definition: transam.h:31
void RelationClose(Relation relation)
Definition: relcache.c:2096
HeapTuple execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map)
Definition: tupconvert.c:139
union ReorderBufferChange::@97 data
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:988
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
#define Assert(condition)
Definition: c.h:804
bool binary
Definition: pgoutput.h:27
struct ReorderBufferChange::@97::@98 tp
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
#define elog(elevel,...)
Definition: elog.h:232
StringInfo out
Definition: logical.h:73
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:186
#define RelationGetRelid(relation)
Definition: rel.h:457
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1990

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 384 of file pgoutput.c.

References logicalrep_write_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

386 {
388 
389  OutputPluginPrepareWrite(ctx, true);
390  logicalrep_write_commit(ctx->out, txn, commit_lsn);
391  OutputPluginWrite(ctx, true);
392 }
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:665
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 711 of file pgoutput.c.

References in_streaming, InvalidTransactionId, logicalrep_write_message(), PGOutputData::messages, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

714 {
717 
718  if (!data->messages)
719  return;
720 
721  /*
722  * Remember the xid for the message in streaming mode. See
723  * pgoutput_change.
724  */
725  if (in_streaming)
726  xid = txn->xid;
727 
728  OutputPluginPrepareWrite(ctx, true);
730  xid,
731  message_lsn,
732  transactional,
733  prefix,
734  sz,
735  message);
736  OutputPluginWrite(ctx, true);
737 }
static bool in_streaming
Definition: pgoutput.c:66
uint32 TransactionId
Definition: c.h:587
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:369
void * output_plugin_private
Definition: logical.h:78
bool messages
Definition: pgoutput.h:29
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 743 of file pgoutput.c.

Referenced by _PG_output_plugin_init().

745 {
746  return false;
747 }

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 756 of file pgoutput.c.

References hash_destroy().

Referenced by _PG_output_plugin_init().

757 {
758  if (RelationSyncCache)
759  {
761  RelationSyncCache = NULL;
762  }
763 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
static HTAB * RelationSyncCache
Definition: pgoutput.c:124

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 261 of file pgoutput.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), PGOutputData::context, LogicalDecodingContext::context, ereport, errcode(), errmsg(), ERROR, in_streaming, init_rel_sync_cache(), list_length(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), PGOutputData::protocol_version, publication_invalidation_cb(), PGOutputData::publication_names, PUBLICATIONOID, PGOutputData::publications, publications_valid, PGOutputData::streaming, and LogicalDecodingContext::streaming.

Referenced by _PG_output_plugin_init().

263 {
264  PGOutputData *data = palloc0(sizeof(PGOutputData));
265 
266  /* Create our memory context for private allocations. */
268  "logical replication output context",
270 
271  ctx->output_plugin_private = data;
272 
273  /* This plugin uses binary protocol. */
275 
276  /*
277  * This is replication start and not slot initialization.
278  *
279  * Parse and validate options passed by the client.
280  */
281  if (!is_init)
282  {
283  /* Parse the params and ERROR if we see any we don't recognize */
285 
286  /* Check if we support requested protocol */
288  ereport(ERROR,
289  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
290  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
292 
294  ereport(ERROR,
295  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
296  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
298 
299  if (list_length(data->publication_names) < 1)
300  ereport(ERROR,
301  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
302  errmsg("publication_names parameter missing")));
303 
304  /*
305  * Decide whether to enable streaming. It is disabled by default, in
306  * which case we just update the flag in decoding context. Otherwise
307  * we only allow it with sufficient version of the protocol, and when
308  * the output plugin supports it.
309  */
310  if (!data->streaming)
311  ctx->streaming = false;
313  ereport(ERROR,
314  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
315  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
317  else if (!ctx->streaming)
318  ereport(ERROR,
319  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
320  errmsg("streaming requested, but not supported by output plugin")));
321 
322  /* Also remember we're currently not streaming any transaction. */
323  in_streaming = false;
324 
325  /* Init publication state. */
326  data->publications = NIL;
327  publications_valid = false;
330  (Datum) 0);
331 
332  /* Initialize relation schema cache. */
334  }
335  else
336  {
337  /* Disable the streaming during the slot initialization mode. */
338  ctx->streaming = false;
339  }
340 }
#define NIL
Definition: pg_list.h:65
#define AllocSetContextCreate
Definition: memutils.h:173
static bool publications_valid
Definition: pgoutput.c:65
static bool in_streaming
Definition: pgoutput.c:66
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:789
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:165
MemoryContext context
Definition: pgoutput.h:20
int errcode(int sqlerrcode)
Definition: elog.c:698
void * output_plugin_private
Definition: logical.h:78
MemoryContext context
Definition: logical.h:37
List * output_plugin_options
Definition: logical.h:61
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:46
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:31
List * publication_names
Definition: pgoutput.h:25
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:34
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1434
void * palloc0(Size size)
Definition: mcxt.c:1093
uintptr_t Datum
Definition: postgres.h:411
#define ereport(elevel,...)
Definition: elog.h:157
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:33
static int list_length(const List *l)
Definition: pg_list.h:149
List * publications
Definition: pgoutput.h:26
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:921
int errmsg(const char *fmt,...)
Definition: elog.c:909
bool streaming
Definition: pgoutput.h:28
MemoryContext CacheMemoryContext
Definition: mcxt.c:51
uint32 protocol_version
Definition: pgoutput.h:24

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 864 of file pgoutput.c.

References Assert, cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_abort(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, ReorderBufferTXN::toptxn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

867 {
868  ReorderBufferTXN *toptxn;
869 
870  /*
871  * The abort should happen outside streaming block, even for streamed
872  * transactions. The transaction has to be marked as streamed, though.
873  */
875 
876  /* determine the toplevel transaction */
877  toptxn = (txn->toptxn) ? txn->toptxn : txn;
878 
879  Assert(rbtxn_is_streamed(toptxn));
880 
881  OutputPluginPrepareWrite(ctx, true);
882  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
883  OutputPluginWrite(ctx, true);
884 
885  cleanup_rel_sync_cache(toptxn->xid, false);
886 }
static bool in_streaming
Definition: pgoutput.c:66
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:878
#define rbtxn_is_streamed(txn)
struct ReorderBufferTXN * toptxn
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
#define Assert(condition)
Definition: c.h:804
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:1129
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 893 of file pgoutput.c.

References Assert, cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

896 {
897  /*
898  * The commit should happen outside streaming block, even for streamed
899  * transactions. The transaction has to be marked as streamed, though.
900  */
903 
905 
906  OutputPluginPrepareWrite(ctx, true);
907  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
908  OutputPluginWrite(ctx, true);
909 
910  cleanup_rel_sync_cache(txn->xid, true);
911 }
static bool in_streaming
Definition: pgoutput.c:66
#define rbtxn_is_streamed(txn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:827
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:665
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
#define Assert(condition)
Definition: c.h:804
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:1129
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 804 of file pgoutput.c.

References Assert, in_streaming, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_origin(), logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, replorigin_by_oid(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

806 {
807  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
808 
809  /* we can't nest streaming of transactions */
811 
812  /*
813  * If we already sent the first stream for this transaction then don't
814  * send the origin id in the subsequent streams.
815  */
816  if (rbtxn_is_streamed(txn))
817  send_replication_origin = false;
818 
819  OutputPluginPrepareWrite(ctx, !send_replication_origin);
821 
822  if (send_replication_origin)
823  {
824  char *origin;
825 
826  if (replorigin_by_oid(txn->origin_id, true, &origin))
827  {
828  /* Message boundary */
829  OutputPluginWrite(ctx, false);
830  OutputPluginPrepareWrite(ctx, true);
832  }
833  }
834 
835  OutputPluginWrite(ctx, true);
836 
837  /* we're streaming a chunk of transaction now */
838  in_streaming = true;
839 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
static bool in_streaming
Definition: pgoutput.c:66
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:449
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:784
#define rbtxn_is_streamed(txn)
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
#define Assert(condition)
Definition: c.h:804
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 845 of file pgoutput.c.

References Assert, in_streaming, logicalrep_write_stream_stop(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

847 {
848  /* we should be streaming a trasanction */
850 
851  OutputPluginPrepareWrite(ctx, true);
853  OutputPluginWrite(ctx, true);
854 
855  /* we've stopped streaming a transaction */
856  in_streaming = false;
857 }
static bool in_streaming
Definition: pgoutput.c:66
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:818
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
#define Assert(condition)
Definition: c.h:804
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 649 of file pgoutput.c.

References PGOutputData::context, ReorderBufferChange::data, get_rel_sync_entry(), i, in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), RelationData::rd_rel, RelationGetRelid, RelationSyncEntry::relid, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

651 {
653  MemoryContext old;
654  RelationSyncEntry *relentry;
655  int i;
656  int nrelids;
657  Oid *relids;
659 
660  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
661  if (in_streaming)
662  xid = change->txn->xid;
663 
664  old = MemoryContextSwitchTo(data->context);
665 
666  relids = palloc0(nrelations * sizeof(Oid));
667  nrelids = 0;
668 
669  for (i = 0; i < nrelations; i++)
670  {
671  Relation relation = relations[i];
672  Oid relid = RelationGetRelid(relation);
673 
674  if (!is_publishable_relation(relation))
675  continue;
676 
677  relentry = get_rel_sync_entry(data, relid);
678 
679  if (!relentry->pubactions.pubtruncate)
680  continue;
681 
682  /*
683  * Don't send partitions if the publication wants to send only the
684  * root tables through it.
685  */
686  if (relation->rd_rel->relispartition &&
687  relentry->publish_as_relid != relid)
688  continue;
689 
690  relids[nrelids++] = relid;
691  maybe_send_schema(ctx, txn, change, relation, relentry);
692  }
693 
694  if (nrelids > 0)
695  {
696  OutputPluginPrepareWrite(ctx, true);
698  xid,
699  nrelids,
700  relids,
701  change->data.truncate.cascade,
702  change->data.truncate.restart_seqs);
703  OutputPluginWrite(ctx, true);
704  }
705 
708 }
static bool in_streaming
Definition: pgoutput.c:66
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:312
uint32 TransactionId
Definition: c.h:587
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:399
MemoryContext context
Definition: pgoutput.h:20
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:87
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:78
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
Form_pg_class rd_rel
Definition: rel.h:110
unsigned int Oid
Definition: postgres_ext.h:31
bool is_publishable_relation(Relation rel)
#define InvalidTransactionId
Definition: transam.h:31
union ReorderBufferChange::@97 data
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:988
void * palloc0(Size size)
Definition: mcxt.c:1093
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
struct ReorderBufferChange::@97::@99 truncate
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73
int i
#define RelationGetRelid(relation)
Definition: rel.h:457

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 789 of file pgoutput.c.

References publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

790 {
791  publications_valid = false;
792 
793  /*
794  * Also invalidate per-relation cache so that next time the filtering info
795  * is checked it will be updated with the new publication settings.
796  */
797  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
798 }
static bool publications_valid
Definition: pgoutput.c:65
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1207
void * arg

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1207 of file pgoutput.c.

References hash_seq_init(), hash_seq_search(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationActions::pubtruncate, PublicationActions::pubupdate, RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

1208 {
1210  RelationSyncEntry *entry;
1211 
1212  /*
1213  * We can get here if the plugin was used in SQL interface as the
1214  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1215  * is no way to unregister the relcache invalidation callback.
1216  */
1217  if (RelationSyncCache == NULL)
1218  return;
1219 
1220  /*
1221  * There is no way to find which entry in our cache the hash belongs to so
1222  * mark the whole cache as invalid.
1223  */
1224  hash_seq_init(&status, RelationSyncCache);
1225  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
1226  {
1227  entry->replicate_valid = false;
1228 
1229  /*
1230  * There might be some relations dropped from the publication so we
1231  * don't need to publish the changes for them.
1232  */
1233  entry->pubactions.pubinsert = false;
1234  entry->pubactions.pubupdate = false;
1235  entry->pubactions.pubdelete = false;
1236  entry->pubactions.pubtruncate = false;
1237  }
1238 }
bool replicate_valid
Definition: pgoutput.c:103
static HTAB * RelationSyncCache
Definition: pgoutput.c:124
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
PublicationActions pubactions
Definition: pgoutput.c:104

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 1165 of file pgoutput.c.

References HASH_FIND, hash_search(), list_free(), NIL, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by init_rel_sync_cache().

1166 {
1167  RelationSyncEntry *entry;
1168 
1169  /*
1170  * We can get here if the plugin was used in SQL interface as the
1171  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1172  * is no way to unregister the relcache invalidation callback.
1173  */
1174  if (RelationSyncCache == NULL)
1175  return;
1176 
1177  /*
1178  * Nobody keeps pointers to entries in this hash table around outside
1179  * logical decoding callback calls - but invalidation events can come in
1180  * *during* a callback if we access the relcache in the callback. Because
1181  * of that we must mark the cache entry as invalid but not remove it from
1182  * the hash while it could still be referenced, then prune it at a later
1183  * safe point.
1184  *
1185  * Getting invalidations for relations that aren't in the table is
1186  * entirely normal, since there's no way to unregister for an invalidation
1187  * event. So we don't care if it's found or not.
1188  */
1189  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1190  HASH_FIND, NULL);
1191 
1192  /*
1193  * Reset schema sent status as the relation definition may have changed.
1194  */
1195  if (entry != NULL)
1196  {
1197  entry->schema_sent = false;
1198  list_free(entry->streamed_txns);
1199  entry->streamed_txns = NIL;
1200  }
1201 }
List * streamed_txns
Definition: pgoutput.c:100
#define NIL
Definition: pg_list.h:65
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
static HTAB * RelationSyncCache
Definition: pgoutput.c:124
void list_free(List *list)
Definition: list.c:1391

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx 
)
static

Definition at line 472 of file pgoutput.c.

References FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

474 {
475  TupleDesc desc = RelationGetDescr(relation);
476  int i;
477 
478  /*
479  * Write out type info if needed. We do that only for user-created types.
480  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
481  * objects with hand-assigned OIDs to be "built in", not for instance any
482  * function or type defined in the information_schema. This is important
483  * because only hand-assigned OIDs can be expected to remain stable across
484  * major versions.
485  */
486  for (i = 0; i < desc->natts; i++)
487  {
488  Form_pg_attribute att = TupleDescAttr(desc, i);
489 
490  if (att->attisdropped || att->attgenerated)
491  continue;
492 
493  if (att->atttypid < FirstGenbkiObjectId)
494  continue;
495 
496  OutputPluginPrepareWrite(ctx, false);
497  logicalrep_write_typ(ctx->out, xid, att->atttypid);
498  OutputPluginWrite(ctx, false);
499  }
500 
501  OutputPluginPrepareWrite(ctx, false);
502  logicalrep_write_rel(ctx->out, xid, relation);
503  OutputPluginWrite(ctx, false);
504 }
#define RelationGetDescr(relation)
Definition: rel.h:483
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
Definition: proto.c:396
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:203
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:639
#define FirstGenbkiObjectId
Definition: transam.h:188
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:652
StringInfo out
Definition: logical.h:73
int i
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:450

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 967 of file pgoutput.c.

References CacheMemoryContext, lappend_int(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

968 {
969  MemoryContext oldctx;
970 
972 
973  entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
974 
975  MemoryContextSwitchTo(oldctx);
976 }
List * streamed_txns
Definition: pgoutput.c:100
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
List * lappend_int(List *list, int datum)
Definition: list.c:354
MemoryContext CacheMemoryContext
Definition: mcxt.c:51

Variable Documentation

◆ in_streaming

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 31 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 65 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache

HTAB* RelationSyncCache = NULL
static

Definition at line 124 of file pgoutput.c.