PostgreSQL Source Code  git master
pgoutput.c File Reference
#include "postgres.h"
#include "access/tupconvert.h"
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "commands/defrem.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/int8.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 

Functions

void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static void pgoutput_message (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static void pgoutput_begin_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static void pgoutput_commit_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_rollback_prepared_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
 
static void pgoutput_stream_start (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_stop (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_stream_abort (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
 
static void pgoutput_stream_commit (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_stream_prepare_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void send_relation_and_attrs (Relation relation, TransactionId xid, LogicalDecodingContext *ctx)
 
static void send_repl_origin (LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
 
static void init_rel_sync_cache (MemoryContext decoding_context)
 
static void cleanup_rel_sync_cache (TransactionId xid, bool is_commit)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Oid relid)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void set_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static bool get_schema_sent_in_streamed_txn (RelationSyncEntry *entry, TransactionId xid)
 
static void parse_output_parameters (List *options, PGOutputData *data)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static bool in_streaming
 
static HTABRelationSyncCache = NULL
 

Typedef Documentation

◆ RelationSyncEntry

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 153 of file pgoutput.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::begin_prepare_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::commit_prepared_cb, OutputPluginCallbacks::filter_by_origin_cb, OutputPluginCallbacks::message_cb, pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_prepared_txn(), pgoutput_commit_txn(), pgoutput_message(), pgoutput_origin_filter(), pgoutput_prepare_txn(), pgoutput_rollback_prepared_txn(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_stream_abort(), pgoutput_stream_commit(), pgoutput_stream_prepare_txn(), pgoutput_stream_start(), pgoutput_stream_stop(), pgoutput_truncate(), OutputPluginCallbacks::prepare_cb, OutputPluginCallbacks::rollback_prepared_cb, OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, OutputPluginCallbacks::stream_abort_cb, OutputPluginCallbacks::stream_change_cb, OutputPluginCallbacks::stream_commit_cb, OutputPluginCallbacks::stream_message_cb, OutputPluginCallbacks::stream_prepare_cb, OutputPluginCallbacks::stream_start_cb, OutputPluginCallbacks::stream_stop_cb, OutputPluginCallbacks::stream_truncate_cb, and OutputPluginCallbacks::truncate_cb.

154 {
156 
163 
170 
171  /* transaction streaming */
179  /* transaction streaming - two-phase commit */
181 }
LogicalDecodeTruncateCB truncate_cb
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:629
LogicalDecodeStreamPrepareCB stream_prepare_cb
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:153
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:924
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:876
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:419
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamMessageCB stream_message_cb
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:831
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1029
LogicalDecodePrepareCB prepare_cb
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1003
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:292
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:433
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:478
LogicalDecodeChangeCB change_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:464
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:863
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeStartupCB startup_cb
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:450
LogicalDecodeStreamStartCB stream_start_cb
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:955
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamStopCB stream_stop_cb
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:769
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:402
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeStreamChangeCB stream_change_cb
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:974
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:963

◆ cleanup_rel_sync_cache()

static void cleanup_rel_sync_cache ( TransactionId  xid,
bool  is_commit 
)
static

Definition at line 1259 of file pgoutput.c.

References Assert, foreach_delete_current, hash_seq_init(), hash_seq_search(), lfirst_int, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_stream_abort(), and pgoutput_stream_commit().

1260 {
1261  HASH_SEQ_STATUS hash_seq;
1262  RelationSyncEntry *entry;
1263  ListCell *lc;
1264 
1265  Assert(RelationSyncCache != NULL);
1266 
1267  hash_seq_init(&hash_seq, RelationSyncCache);
1268  while ((entry = hash_seq_search(&hash_seq)) != NULL)
1269  {
1270  /*
1271  * We can set the schema_sent flag for an entry that has committed xid
1272  * in the list as that ensures that the subscriber would have the
1273  * corresponding schema and we don't need to send it unless there is
1274  * any invalidation for that relation.
1275  */
1276  foreach(lc, entry->streamed_txns)
1277  {
1278  if (xid == (uint32) lfirst_int(lc))
1279  {
1280  if (is_commit)
1281  entry->schema_sent = true;
1282 
1283  entry->streamed_txns =
1285  break;
1286  }
1287  }
1288  }
1289 }
List * streamed_txns
Definition: pgoutput.c:112
#define foreach_delete_current(lst, cell)
Definition: pg_list.h:369
#define lfirst_int(lc)
Definition: pg_list.h:170
unsigned int uint32
Definition: c.h:441
static HTAB * RelationSyncCache
Definition: pgoutput.c:136
#define Assert(condition)
Definition: c.h:804
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Oid  relid 
)
static

Definition at line 1116 of file pgoutput.c.

References Publication::alltables, Assert, CacheMemoryContext, get_partition_ancestors(), get_rel_relispartition(), get_rel_relkind(), GetRelationPublications(), HASH_ENTER, hash_search(), InvalidOid, lfirst, lfirst_oid, list_free(), list_free_deep(), list_member_oid(), llast_oid, LoadPublications(), RelationSyncEntry::map, MemoryContextSwitchTo(), NIL, Publication::oid, Publication::pubactions, RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PGOutputData::publication_names, PGOutputData::publications, publications_valid, RelationSyncEntry::publish_as_relid, PublicationActions::pubtruncate, PublicationActions::pubupdate, Publication::pubviaroot, RelationSyncEntry::relid, RelationSyncEntry::replicate_valid, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by pgoutput_change(), and pgoutput_truncate().

1117 {
1118  RelationSyncEntry *entry;
1119  bool am_partition = get_rel_relispartition(relid);
1120  char relkind = get_rel_relkind(relid);
1121  bool found;
1122  MemoryContext oldctx;
1123 
1124  Assert(RelationSyncCache != NULL);
1125 
1126  /* Find cached relation info, creating if not found */
1128  (void *) &relid,
1129  HASH_ENTER, &found);
1130  Assert(entry != NULL);
1131 
1132  /* Not found means schema wasn't sent */
1133  if (!found)
1134  {
1135  /* immediately make a new entry valid enough to satisfy callbacks */
1136  entry->schema_sent = false;
1137  entry->streamed_txns = NIL;
1138  entry->replicate_valid = false;
1139  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
1140  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
1141  entry->publish_as_relid = InvalidOid;
1142  entry->map = NULL; /* will be set by maybe_send_schema() if
1143  * needed */
1144  }
1145 
1146  /* Validate the entry */
1147  if (!entry->replicate_valid)
1148  {
1149  List *pubids = GetRelationPublications(relid);
1150  ListCell *lc;
1151  Oid publish_as_relid = relid;
1152 
1153  /* Reload publications if needed before use. */
1154  if (!publications_valid)
1155  {
1157  if (data->publications)
1159 
1161  MemoryContextSwitchTo(oldctx);
1162  publications_valid = true;
1163  }
1164 
1165  /*
1166  * Build publication cache. We can't use one provided by relcache as
1167  * relcache considers all publications given relation is in, but here
1168  * we only need to consider ones that the subscriber requested.
1169  */
1170  foreach(lc, data->publications)
1171  {
1172  Publication *pub = lfirst(lc);
1173  bool publish = false;
1174 
1175  if (pub->alltables)
1176  {
1177  publish = true;
1178  if (pub->pubviaroot && am_partition)
1179  publish_as_relid = llast_oid(get_partition_ancestors(relid));
1180  }
1181 
1182  if (!publish)
1183  {
1184  bool ancestor_published = false;
1185 
1186  /*
1187  * For a partition, check if any of the ancestors are
1188  * published. If so, note down the topmost ancestor that is
1189  * published via this publication, which will be used as the
1190  * relation via which to publish the partition's changes.
1191  */
1192  if (am_partition)
1193  {
1194  List *ancestors = get_partition_ancestors(relid);
1195  ListCell *lc2;
1196 
1197  /*
1198  * Find the "topmost" ancestor that is in this
1199  * publication.
1200  */
1201  foreach(lc2, ancestors)
1202  {
1203  Oid ancestor = lfirst_oid(lc2);
1204 
1206  pub->oid))
1207  {
1208  ancestor_published = true;
1209  if (pub->pubviaroot)
1210  publish_as_relid = ancestor;
1211  }
1212  }
1213  }
1214 
1215  if (list_member_oid(pubids, pub->oid) || ancestor_published)
1216  publish = true;
1217  }
1218 
1219  /*
1220  * Don't publish changes for partitioned tables, because
1221  * publishing those of its partitions suffices, unless partition
1222  * changes won't be published due to pubviaroot being set.
1223  */
1224  if (publish &&
1225  (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
1226  {
1227  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
1228  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
1229  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
1231  }
1232 
1233  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
1234  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
1235  break;
1236  }
1237 
1238  list_free(pubids);
1239 
1240  entry->publish_as_relid = publish_as_relid;
1241  entry->replicate_valid = true;
1242  }
1243 
1244  return entry;
1245 }
List * streamed_txns
Definition: pgoutput.c:112
#define NIL
Definition: pg_list.h:65
PublicationActions pubactions
static bool publications_valid
Definition: pgoutput.c:77
bool replicate_valid
Definition: pgoutput.c:115
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1974
TupleConversionMap * map
Definition: pgoutput.c:132
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define llast_oid(l)
Definition: pg_list.h:196
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
unsigned int Oid
Definition: postgres_ext.h:31
void list_free_deep(List *list)
Definition: list.c:1405
List * GetRelationPublications(Oid relid)
List * publication_names
Definition: pgoutput.h:25
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:1998
static HTAB * RelationSyncCache
Definition: pgoutput.c:136
#define InvalidOid
Definition: postgres_ext.h:36
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:689
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:889
List * publications
Definition: pgoutput.h:26
void list_free(List *list)
Definition: list.c:1391
Definition: pg_list.h:50
List * get_partition_ancestors(Oid relid)
Definition: partition.c:133
#define lfirst_oid(lc)
Definition: pg_list.h:171
MemoryContext CacheMemoryContext
Definition: mcxt.c:51
PublicationActions pubactions
Definition: pgoutput.c:116

◆ get_schema_sent_in_streamed_txn()

static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1077 of file pgoutput.c.

References lfirst_int, and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

1078 {
1079  ListCell *lc;
1080 
1081  foreach(lc, entry->streamed_txns)
1082  {
1083  if (xid == (uint32) lfirst_int(lc))
1084  return true;
1085  }
1086 
1087  return false;
1088 }
List * streamed_txns
Definition: pgoutput.c:112
#define lfirst_int(lc)
Definition: pg_list.h:170
unsigned int uint32
Definition: c.h:441

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  decoding_context)
static

Definition at line 1049 of file pgoutput.c.

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), and rel_sync_cache_relation_cb().

Referenced by pgoutput_startup().

1050 {
1051  HASHCTL ctl;
1052 
1053  if (RelationSyncCache != NULL)
1054  return;
1055 
1056  /* Make a new hash table for the cache */
1057  ctl.keysize = sizeof(Oid);
1058  ctl.entrysize = sizeof(RelationSyncEntry);
1059  ctl.hcxt = cachectx;
1060 
1061  RelationSyncCache = hash_create("logical replication output relation cache",
1062  128, &ctl,
1064 
1065  Assert(RelationSyncCache != NULL);
1066 
1070  (Datum) 0);
1071 }
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
MemoryContext hcxt
Definition: hsearch.h:86
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:1295
Size entrysize
Definition: hsearch.h:76
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1349
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1540
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:349
static HTAB * RelationSyncCache
Definition: pgoutput.c:136
#define HASH_BLOBS
Definition: hsearch.h:97
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1498
uintptr_t Datum
Definition: postgres.h:411
Size keysize
Definition: hsearch.h:75
#define Assert(condition)
Definition: c.h:804

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 889 of file pgoutput.c.

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

890 {
891  List *result = NIL;
892  ListCell *lc;
893 
894  foreach(lc, pubnames)
895  {
896  char *pubname = (char *) lfirst(lc);
897  Publication *pub = GetPublicationByName(pubname, false);
898 
899  result = lappend(result, pub);
900  }
901 
902  return result;
903 }
#define NIL
Definition: pg_list.h:65
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * lappend(List *list, void *datum)
Definition: list.c:336
#define lfirst(lc)
Definition: pg_list.h:169
Definition: pg_list.h:50

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
ReorderBufferChange change,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 496 of file pgoutput.c.

References CacheMemoryContext, convert_tuples_by_name(), CreateTupleDescCopy(), FreeTupleDesc(), get_schema_sent_in_streamed_txn(), in_streaming, InvalidTransactionId, RelationSyncEntry::map, MemoryContextSwitchTo(), RelationSyncEntry::publish_as_relid, RelationClose(), RelationGetDescr, RelationGetRelid, RelationIdGetRelation(), RelationSyncEntry::schema_sent, send_relation_and_attrs(), set_schema_sent_in_streamed_txn(), ReorderBufferTXN::toptxn, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by pgoutput_change(), and pgoutput_truncate().

499 {
500  bool schema_sent;
503 
504  /*
505  * Remember XID of the (sub)transaction for the change. We don't care if
506  * it's top-level transaction or not (we have already sent that XID in
507  * start of the current streaming block).
508  *
509  * If we're not in a streaming block, just use InvalidTransactionId and
510  * the write methods will not include it.
511  */
512  if (in_streaming)
513  xid = change->txn->xid;
514 
515  if (change->txn->toptxn)
516  topxid = change->txn->toptxn->xid;
517  else
518  topxid = xid;
519 
520  /*
521  * Do we need to send the schema? We do track streamed transactions
522  * separately, because those may be applied later (and the regular
523  * transactions won't see their effects until then) and in an order that
524  * we don't know at this point.
525  *
526  * XXX There is a scope of optimization here. Currently, we always send
527  * the schema first time in a streaming transaction but we can probably
528  * avoid that by checking 'relentry->schema_sent' flag. However, before
529  * doing that we need to study its impact on the case where we have a mix
530  * of streaming and non-streaming transactions.
531  */
532  if (in_streaming)
533  schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
534  else
535  schema_sent = relentry->schema_sent;
536 
537  /* Nothing to do if we already sent the schema. */
538  if (schema_sent)
539  return;
540 
541  /*
542  * Nope, so send the schema. If the changes will be published using an
543  * ancestor's schema, not the relation's own, send that ancestor's schema
544  * before sending relation's own (XXX - maybe sending only the former
545  * suffices?). This is also a good place to set the map that will be used
546  * to convert the relation's tuples into the ancestor's format, if needed.
547  */
548  if (relentry->publish_as_relid != RelationGetRelid(relation))
549  {
550  Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
551  TupleDesc indesc = RelationGetDescr(relation);
552  TupleDesc outdesc = RelationGetDescr(ancestor);
553  MemoryContext oldctx;
554 
555  /* Map must live as long as the session does. */
557 
558  /*
559  * Make copies of the TupleDescs that will live as long as the map
560  * does before putting into the map.
561  */
562  indesc = CreateTupleDescCopy(indesc);
563  outdesc = CreateTupleDescCopy(outdesc);
564  relentry->map = convert_tuples_by_name(indesc, outdesc);
565  if (relentry->map == NULL)
566  {
567  /* Map not necessary, so free the TupleDescs too. */
568  FreeTupleDesc(indesc);
569  FreeTupleDesc(outdesc);
570  }
571 
572  MemoryContextSwitchTo(oldctx);
573  send_relation_and_attrs(ancestor, xid, ctx);
574  RelationClose(ancestor);
575  }
576 
577  send_relation_and_attrs(relation, xid, ctx);
578 
579  if (in_streaming)
580  set_schema_sent_in_streamed_txn(relentry, topxid);
581  else
582  relentry->schema_sent = true;
583 }
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:111
static bool in_streaming
Definition: pgoutput.c:78
uint32 TransactionId
Definition: c.h:587
#define RelationGetDescr(relation)
Definition: rel.h:503
TupleConversionMap * map
Definition: pgoutput.c:132
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx)
Definition: pgoutput.c:589
TupleConversionMap * convert_tuples_by_name(TupleDesc indesc, TupleDesc outdesc)
Definition: tupconvert.c:102
#define InvalidTransactionId
Definition: transam.h:31
struct ReorderBufferTXN * toptxn
void RelationClose(Relation relation)
Definition: relcache.c:2101
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1077
TransactionId xid
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1095
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:309
#define RelationGetRelid(relation)
Definition: rel.h:477
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1995
MemoryContext CacheMemoryContext
Definition: mcxt.c:51

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
PGOutputData data 
)
static

Definition at line 184 of file pgoutput.c.

References DefElem::arg, Assert, PGOutputData::binary, defGetBoolean(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, PGOutputData::messages, PG_UINT32_MAX, PGOutputData::protocol_version, PGOutputData::publication_names, scanint8(), SplitIdentifierString(), PGOutputData::streaming, strVal, and PGOutputData::two_phase.

Referenced by pgoutput_startup().

185 {
186  ListCell *lc;
187  bool protocol_version_given = false;
188  bool publication_names_given = false;
189  bool binary_option_given = false;
190  bool messages_option_given = false;
191  bool streaming_given = false;
192  bool two_phase_option_given = false;
193 
194  data->binary = false;
195  data->streaming = false;
196  data->messages = false;
197  data->two_phase = false;
198 
199  foreach(lc, options)
200  {
201  DefElem *defel = (DefElem *) lfirst(lc);
202 
203  Assert(defel->arg == NULL || IsA(defel->arg, String));
204 
205  /* Check each param, whether or not we recognize it */
206  if (strcmp(defel->defname, "proto_version") == 0)
207  {
208  int64 parsed;
209 
210  if (protocol_version_given)
211  ereport(ERROR,
212  (errcode(ERRCODE_SYNTAX_ERROR),
213  errmsg("conflicting or redundant options")));
214  protocol_version_given = true;
215 
216  if (!scanint8(strVal(defel->arg), true, &parsed))
217  ereport(ERROR,
218  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
219  errmsg("invalid proto_version")));
220 
221  if (parsed > PG_UINT32_MAX || parsed < 0)
222  ereport(ERROR,
223  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
224  errmsg("proto_version \"%s\" out of range",
225  strVal(defel->arg))));
226 
227  data->protocol_version = (uint32) parsed;
228  }
229  else if (strcmp(defel->defname, "publication_names") == 0)
230  {
231  if (publication_names_given)
232  ereport(ERROR,
233  (errcode(ERRCODE_SYNTAX_ERROR),
234  errmsg("conflicting or redundant options")));
235  publication_names_given = true;
236 
237  if (!SplitIdentifierString(strVal(defel->arg), ',',
238  &data->publication_names))
239  ereport(ERROR,
240  (errcode(ERRCODE_INVALID_NAME),
241  errmsg("invalid publication_names syntax")));
242  }
243  else if (strcmp(defel->defname, "binary") == 0)
244  {
245  if (binary_option_given)
246  ereport(ERROR,
247  (errcode(ERRCODE_SYNTAX_ERROR),
248  errmsg("conflicting or redundant options")));
249  binary_option_given = true;
250 
251  data->binary = defGetBoolean(defel);
252  }
253  else if (strcmp(defel->defname, "messages") == 0)
254  {
255  if (messages_option_given)
256  ereport(ERROR,
257  (errcode(ERRCODE_SYNTAX_ERROR),
258  errmsg("conflicting or redundant options")));
259  messages_option_given = true;
260 
261  data->messages = defGetBoolean(defel);
262  }
263  else if (strcmp(defel->defname, "streaming") == 0)
264  {
265  if (streaming_given)
266  ereport(ERROR,
267  (errcode(ERRCODE_SYNTAX_ERROR),
268  errmsg("conflicting or redundant options")));
269  streaming_given = true;
270 
271  data->streaming = defGetBoolean(defel);
272  }
273  else if (strcmp(defel->defname, "two_phase") == 0)
274  {
275  if (two_phase_option_given)
276  ereport(ERROR,
277  (errcode(ERRCODE_SYNTAX_ERROR),
278  errmsg("conflicting or redundant options")));
279  two_phase_option_given = true;
280 
281  data->two_phase = defGetBoolean(defel);
282  }
283  else
284  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
285  }
286 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:587
#define strVal(v)
Definition: value.h:65
int errcode(int sqlerrcode)
Definition: elog.c:698
Definition: value.h:51
#define PG_UINT32_MAX
Definition: c.h:525
bool defGetBoolean(DefElem *def)
Definition: define.c:106
#define ERROR
Definition: elog.h:46
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3746
bool messages
Definition: pgoutput.h:29
List * publication_names
Definition: pgoutput.h:25
unsigned int uint32
Definition: c.h:441
Node * arg
Definition: parsenodes.h:759
#define ereport(elevel,...)
Definition: elog.h:157
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
bool binary
Definition: pgoutput.h:27
bool two_phase
Definition: pgoutput.h:30
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define elog(elevel,...)
Definition: elog.h:232
char * defname
Definition: parsenodes.h:758
bool streaming
Definition: pgoutput.h:28
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55
uint32 protocol_version
Definition: pgoutput.h:24

◆ pgoutput_begin_prepare_txn()

static void pgoutput_begin_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 433 of file pgoutput.c.

References InvalidRepOriginId, logicalrep_write_begin_prepare(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

434 {
435  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
436 
437  OutputPluginPrepareWrite(ctx, !send_replication_origin);
439 
440  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
441  send_replication_origin);
442 
443  OutputPluginWrite(ctx, true);
444 }
RepOriginId origin_id
XLogRecPtr origin_lsn
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:1384
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:113
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 402 of file pgoutput.c.

References InvalidRepOriginId, logicalrep_write_begin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and send_repl_origin().

Referenced by _PG_output_plugin_init().

403 {
404  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
405 
406  OutputPluginPrepareWrite(ctx, !send_replication_origin);
407  logicalrep_write_begin(ctx->out, txn);
408 
409  send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
410  send_replication_origin);
411 
412  OutputPluginWrite(ctx, true);
413 }
RepOriginId origin_id
XLogRecPtr origin_lsn
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:1384
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 629 of file pgoutput.c.

References ReorderBufferChange::action, Assert, PGOutputData::binary, PGOutputData::context, ReorderBufferChange::data, DEBUG1, elog, execute_attr_map_tuple(), get_rel_sync_entry(), in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationData::rd_rel, RelationClose(), RelationGetRelid, RelationIdGetRelation(), RelationIsValid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

631 {
633  MemoryContext old;
634  RelationSyncEntry *relentry;
636  Relation ancestor = NULL;
637 
638  if (!is_publishable_relation(relation))
639  return;
640 
641  /*
642  * Remember the xid for the change in streaming mode. We need to send xid
643  * with each change in the streaming mode so that subscriber can make
644  * their association and on aborts, it can discard the corresponding
645  * changes.
646  */
647  if (in_streaming)
648  xid = change->txn->xid;
649 
650  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
651 
652  /* First check the table filter */
653  switch (change->action)
654  {
656  if (!relentry->pubactions.pubinsert)
657  return;
658  break;
660  if (!relentry->pubactions.pubupdate)
661  return;
662  break;
664  if (!relentry->pubactions.pubdelete)
665  return;
666  break;
667  default:
668  Assert(false);
669  }
670 
671  /* Avoid leaking memory by using and resetting our own context */
672  old = MemoryContextSwitchTo(data->context);
673 
674  maybe_send_schema(ctx, change, relation, relentry);
675 
676  /* Send the data */
677  switch (change->action)
678  {
680  {
681  HeapTuple tuple = &change->data.tp.newtuple->tuple;
682 
683  /* Switch relation if publishing via root. */
684  if (relentry->publish_as_relid != RelationGetRelid(relation))
685  {
686  Assert(relation->rd_rel->relispartition);
687  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
688  relation = ancestor;
689  /* Convert tuple if needed. */
690  if (relentry->map)
691  tuple = execute_attr_map_tuple(tuple, relentry->map);
692  }
693 
694  OutputPluginPrepareWrite(ctx, true);
695  logicalrep_write_insert(ctx->out, xid, relation, tuple,
696  data->binary);
697  OutputPluginWrite(ctx, true);
698  break;
699  }
701  {
702  HeapTuple oldtuple = change->data.tp.oldtuple ?
703  &change->data.tp.oldtuple->tuple : NULL;
704  HeapTuple newtuple = &change->data.tp.newtuple->tuple;
705 
706  /* Switch relation if publishing via root. */
707  if (relentry->publish_as_relid != RelationGetRelid(relation))
708  {
709  Assert(relation->rd_rel->relispartition);
710  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
711  relation = ancestor;
712  /* Convert tuples if needed. */
713  if (relentry->map)
714  {
715  if (oldtuple)
716  oldtuple = execute_attr_map_tuple(oldtuple,
717  relentry->map);
718  newtuple = execute_attr_map_tuple(newtuple,
719  relentry->map);
720  }
721  }
722 
723  OutputPluginPrepareWrite(ctx, true);
724  logicalrep_write_update(ctx->out, xid, relation, oldtuple,
725  newtuple, data->binary);
726  OutputPluginWrite(ctx, true);
727  break;
728  }
730  if (change->data.tp.oldtuple)
731  {
732  HeapTuple oldtuple = &change->data.tp.oldtuple->tuple;
733 
734  /* Switch relation if publishing via root. */
735  if (relentry->publish_as_relid != RelationGetRelid(relation))
736  {
737  Assert(relation->rd_rel->relispartition);
738  ancestor = RelationIdGetRelation(relentry->publish_as_relid);
739  relation = ancestor;
740  /* Convert tuple if needed. */
741  if (relentry->map)
742  oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
743  }
744 
745  OutputPluginPrepareWrite(ctx, true);
746  logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
747  data->binary);
748  OutputPluginWrite(ctx, true);
749  }
750  else
751  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
752  break;
753  default:
754  Assert(false);
755  }
756 
757  if (RelationIsValid(ancestor))
758  {
759  RelationClose(ancestor);
760  ancestor = NULL;
761  }
762 
763  /* Cleanup */
766 }
#define DEBUG1
Definition: elog.h:25
static bool in_streaming
Definition: pgoutput.c:78
uint32 TransactionId
Definition: c.h:587
MemoryContext context
Definition: pgoutput.h:20
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, bool binary)
Definition: proto.c:518
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:85
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary)
Definition: proto.c:400
#define RelationIsValid(relation)
Definition: rel.h:450
bool is_publishable_relation(Relation rel)
#define InvalidTransactionId
Definition: transam.h:31
void RelationClose(Relation relation)
Definition: relcache.c:2101
HeapTuple execute_attr_map_tuple(HeapTuple tuple, TupleConversionMap *map)
Definition: tupconvert.c:139
union ReorderBufferChange::@97 data
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:1116
TransactionId xid
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:496
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
#define Assert(condition)
Definition: c.h:804
bool binary
Definition: pgoutput.h:27
struct ReorderBufferChange::@97::@98 tp
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
#define elog(elevel,...)
Definition: elog.h:232
StringInfo out
Definition: logical.h:70
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary)
Definition: proto.c:444
#define RelationGetRelid(relation)
Definition: rel.h:477
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:1995

◆ pgoutput_commit_prepared_txn()

static void pgoutput_commit_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 464 of file pgoutput.c.

References logicalrep_write_commit_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

466 {
468 
469  OutputPluginPrepareWrite(ctx, true);
470  logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
471  OutputPluginWrite(ctx, true);
472 }
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:674
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:234
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 419 of file pgoutput.c.

References logicalrep_write_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

421 {
423 
424  OutputPluginPrepareWrite(ctx, true);
425  logicalrep_write_commit(ctx->out, txn, commit_lsn);
426  OutputPluginWrite(ctx, true);
427 }
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:674
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_message()

static void pgoutput_message ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  message_lsn,
bool  transactional,
const char *  prefix,
Size  sz,
const char *  message 
)
static

Definition at line 831 of file pgoutput.c.

References in_streaming, InvalidTransactionId, logicalrep_write_message(), PGOutputData::messages, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

834 {
837 
838  if (!data->messages)
839  return;
840 
841  /*
842  * Remember the xid for the message in streaming mode. See
843  * pgoutput_change.
844  */
845  if (in_streaming)
846  xid = txn->xid;
847 
848  OutputPluginPrepareWrite(ctx, true);
850  xid,
851  message_lsn,
852  transactional,
853  prefix,
854  sz,
855  message);
856  OutputPluginWrite(ctx, true);
857 }
static bool in_streaming
Definition: pgoutput.c:78
uint32 TransactionId
Definition: c.h:587
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:627
void * output_plugin_private
Definition: logical.h:75
bool messages
Definition: pgoutput.h:29
#define InvalidTransactionId
Definition: transam.h:31
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 863 of file pgoutput.c.

Referenced by _PG_output_plugin_init().

865 {
866  return false;
867 }

◆ pgoutput_prepare_txn()

static void pgoutput_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 450 of file pgoutput.c.

References logicalrep_write_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

452 {
454 
455  OutputPluginPrepareWrite(ctx, true);
456  logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
457  OutputPluginWrite(ctx, true);
458 }
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:184
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:674
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_rollback_prepared_txn()

static void pgoutput_rollback_prepared_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_end_lsn,
TimestampTz  prepare_time 
)
static

Definition at line 478 of file pgoutput.c.

References logicalrep_write_rollback_prepared(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

482 {
484 
485  OutputPluginPrepareWrite(ctx, true);
486  logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
487  prepare_time);
488  OutputPluginWrite(ctx, true);
489 }
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:674
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:290
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 876 of file pgoutput.c.

References hash_destroy().

Referenced by _PG_output_plugin_init().

877 {
878  if (RelationSyncCache)
879  {
881  RelationSyncCache = NULL;
882  }
883 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:862
static HTAB * RelationSyncCache
Definition: pgoutput.c:136

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 292 of file pgoutput.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), PGOutputData::context, LogicalDecodingContext::context, ereport, errcode(), errmsg(), ERROR, in_streaming, init_rel_sync_cache(), list_length(), LOGICALREP_PROTO_MAX_VERSION_NUM, LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_STREAM_VERSION_NUM, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), PGOutputData::protocol_version, publication_invalidation_cb(), PGOutputData::publication_names, PUBLICATIONOID, PGOutputData::publications, publications_valid, PGOutputData::streaming, LogicalDecodingContext::streaming, PGOutputData::two_phase, LogicalDecodingContext::twophase, and LogicalDecodingContext::twophase_opt_given.

Referenced by _PG_output_plugin_init().

294 {
295  PGOutputData *data = palloc0(sizeof(PGOutputData));
296 
297  /* Create our memory context for private allocations. */
299  "logical replication output context",
301 
302  ctx->output_plugin_private = data;
303 
304  /* This plugin uses binary protocol. */
306 
307  /*
308  * This is replication start and not slot initialization.
309  *
310  * Parse and validate options passed by the client.
311  */
312  if (!is_init)
313  {
314  /* Parse the params and ERROR if we see any we don't recognize */
316 
317  /* Check if we support requested protocol */
319  ereport(ERROR,
320  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
321  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
323 
325  ereport(ERROR,
326  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
327  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
329 
330  if (list_length(data->publication_names) < 1)
331  ereport(ERROR,
332  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
333  errmsg("publication_names parameter missing")));
334 
335  /*
336  * Decide whether to enable streaming. It is disabled by default, in
337  * which case we just update the flag in decoding context. Otherwise
338  * we only allow it with sufficient version of the protocol, and when
339  * the output plugin supports it.
340  */
341  if (!data->streaming)
342  ctx->streaming = false;
344  ereport(ERROR,
345  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
346  errmsg("requested proto_version=%d does not support streaming, need %d or higher",
348  else if (!ctx->streaming)
349  ereport(ERROR,
350  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
351  errmsg("streaming requested, but not supported by output plugin")));
352 
353  /* Also remember we're currently not streaming any transaction. */
354  in_streaming = false;
355 
356  /*
357  * Here, we just check whether the two-phase option is passed by
358  * plugin and decide whether to enable it at later point of time. It
359  * remains enabled if the previous start-up has done so. But we only
360  * allow the option to be passed in with sufficient version of the
361  * protocol, and when the output plugin supports it.
362  */
363  if (!data->two_phase)
364  ctx->twophase_opt_given = false;
366  ereport(ERROR,
367  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
368  errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
370  else if (!ctx->twophase)
371  ereport(ERROR,
372  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
373  errmsg("two-phase commit requested, but not supported by output plugin")));
374  else
375  ctx->twophase_opt_given = true;
376 
377  /* Init publication state. */
378  data->publications = NIL;
379  publications_valid = false;
382  (Datum) 0);
383 
384  /* Initialize relation schema cache. */
386  }
387  else
388  {
389  /*
390  * Disable the streaming and prepared transactions during the slot
391  * initialization mode.
392  */
393  ctx->streaming = false;
394  ctx->twophase = false;
395  }
396 }
#define NIL
Definition: pg_list.h:65
#define AllocSetContextCreate
Definition: memutils.h:173
static bool publications_valid
Definition: pgoutput.c:77
static bool in_streaming
Definition: pgoutput.c:78
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:909
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:184
MemoryContext context
Definition: pgoutput.h:20
int errcode(int sqlerrcode)
Definition: elog.c:698
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
List * output_plugin_options
Definition: logical.h:58
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:46
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:195
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:35
List * publication_names
Definition: pgoutput.h:25
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:39
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1498
void * palloc0(Size size)
Definition: mcxt.c:1093
uintptr_t Datum
Definition: postgres.h:411
#define ereport(elevel,...)
Definition: elog.h:157
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:37
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:38
static int list_length(const List *l)
Definition: pg_list.h:149
List * publications
Definition: pgoutput.h:26
bool two_phase
Definition: pgoutput.h:30
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:1049
int errmsg(const char *fmt,...)
Definition: elog.c:909
bool streaming
Definition: pgoutput.h:28
MemoryContext CacheMemoryContext
Definition: mcxt.c:51
uint32 protocol_version
Definition: pgoutput.h:24

◆ pgoutput_stream_abort()

static void pgoutput_stream_abort ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  abort_lsn 
)
static

Definition at line 974 of file pgoutput.c.

References Assert, cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_abort(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, ReorderBufferTXN::toptxn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

977 {
978  ReorderBufferTXN *toptxn;
979 
980  /*
981  * The abort should happen outside streaming block, even for streamed
982  * transactions. The transaction has to be marked as streamed, though.
983  */
985 
986  /* determine the toplevel transaction */
987  toptxn = (txn->toptxn) ? txn->toptxn : txn;
988 
989  Assert(rbtxn_is_streamed(toptxn));
990 
991  OutputPluginPrepareWrite(ctx, true);
992  logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
993  OutputPluginWrite(ctx, true);
994 
995  cleanup_rel_sync_cache(toptxn->xid, false);
996 }
static bool in_streaming
Definition: pgoutput.c:78
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid)
Definition: proto.c:1135
#define rbtxn_is_streamed(txn)
struct ReorderBufferTXN * toptxn
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
#define Assert(condition)
Definition: c.h:804
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:1259
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_stream_commit()

static void pgoutput_stream_commit ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 1003 of file pgoutput.c.

References Assert, cleanup_rel_sync_cache(), in_streaming, logicalrep_write_stream_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), rbtxn_is_streamed, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

1006 {
1007  /*
1008  * The commit should happen outside streaming block, even for streamed
1009  * transactions. The transaction has to be marked as streamed, though.
1010  */
1011  Assert(!in_streaming);
1012  Assert(rbtxn_is_streamed(txn));
1013 
1015 
1016  OutputPluginPrepareWrite(ctx, true);
1017  logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1018  OutputPluginWrite(ctx, true);
1019 
1020  cleanup_rel_sync_cache(txn->xid, true);
1021 }
static bool in_streaming
Definition: pgoutput.c:78
#define rbtxn_is_streamed(txn)
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1084
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:674
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
#define Assert(condition)
Definition: c.h:804
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:1259
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_stream_prepare_txn()

static void pgoutput_stream_prepare_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  prepare_lsn 
)
static

Definition at line 1029 of file pgoutput.c.

References Assert, logicalrep_write_stream_prepare(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), OutputPluginWrite(), and rbtxn_is_streamed.

Referenced by _PG_output_plugin_init().

1032 {
1033  Assert(rbtxn_is_streamed(txn));
1034 
1036  OutputPluginPrepareWrite(ctx, true);
1037  logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1038  OutputPluginWrite(ctx, true);
1039 }
#define rbtxn_is_streamed(txn)
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:674
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:350
#define Assert(condition)
Definition: c.h:804
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_stream_start()

static void pgoutput_stream_start ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 924 of file pgoutput.c.

References Assert, in_streaming, InvalidRepOriginId, InvalidXLogRecPtr, logicalrep_write_stream_start(), ReorderBufferTXN::origin_id, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), rbtxn_is_streamed, send_repl_origin(), and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

926 {
927  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
928 
929  /* we can't nest streaming of transactions */
931 
932  /*
933  * If we already sent the first stream for this transaction then don't
934  * send the origin id in the subsequent streams.
935  */
936  if (rbtxn_is_streamed(txn))
937  send_replication_origin = false;
938 
939  OutputPluginPrepareWrite(ctx, !send_replication_origin);
941 
943  send_replication_origin);
944 
945  OutputPluginWrite(ctx, true);
946 
947  /* we're streaming a chunk of transaction now */
948  in_streaming = true;
949 }
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
RepOriginId origin_id
static bool in_streaming
Definition: pgoutput.c:78
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1041
#define rbtxn_is_streamed(txn)
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:1384
TransactionId xid
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
#define Assert(condition)
Definition: c.h:804
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_stream_stop()

static void pgoutput_stream_stop ( struct LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 955 of file pgoutput.c.

References Assert, in_streaming, logicalrep_write_stream_stop(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

957 {
958  /* we should be streaming a trasanction */
960 
961  OutputPluginPrepareWrite(ctx, true);
963  OutputPluginWrite(ctx, true);
964 
965  /* we've stopped streaming a transaction */
966  in_streaming = false;
967 }
static bool in_streaming
Definition: pgoutput.c:78
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1075
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
#define Assert(condition)
Definition: c.h:804
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 769 of file pgoutput.c.

References PGOutputData::context, ReorderBufferChange::data, get_rel_sync_entry(), i, in_streaming, InvalidTransactionId, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), RelationData::rd_rel, RelationGetRelid, RelationSyncEntry::relid, ReorderBufferChange::truncate, ReorderBufferChange::txn, and ReorderBufferTXN::xid.

Referenced by _PG_output_plugin_init().

771 {
773  MemoryContext old;
774  RelationSyncEntry *relentry;
775  int i;
776  int nrelids;
777  Oid *relids;
779 
780  /* Remember the xid for the change in streaming mode. See pgoutput_change. */
781  if (in_streaming)
782  xid = change->txn->xid;
783 
784  old = MemoryContextSwitchTo(data->context);
785 
786  relids = palloc0(nrelations * sizeof(Oid));
787  nrelids = 0;
788 
789  for (i = 0; i < nrelations; i++)
790  {
791  Relation relation = relations[i];
792  Oid relid = RelationGetRelid(relation);
793 
794  if (!is_publishable_relation(relation))
795  continue;
796 
797  relentry = get_rel_sync_entry(data, relid);
798 
799  if (!relentry->pubactions.pubtruncate)
800  continue;
801 
802  /*
803  * Don't send partitions if the publication wants to send only the
804  * root tables through it.
805  */
806  if (relation->rd_rel->relispartition &&
807  relentry->publish_as_relid != relid)
808  continue;
809 
810  relids[nrelids++] = relid;
811  maybe_send_schema(ctx, change, relation, relentry);
812  }
813 
814  if (nrelids > 0)
815  {
816  OutputPluginPrepareWrite(ctx, true);
818  xid,
819  nrelids,
820  relids,
821  change->data.truncate.cascade,
822  change->data.truncate.restart_seqs);
823  OutputPluginWrite(ctx, true);
824  }
825 
828 }
static bool in_streaming
Definition: pgoutput.c:78
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:570
uint32 TransactionId
Definition: c.h:587
MemoryContext context
Definition: pgoutput.h:20
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:88
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
bool is_publishable_relation(Relation rel)
#define InvalidTransactionId
Definition: transam.h:31
union ReorderBufferChange::@97 data
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:1116
void * palloc0(Size size)
Definition: mcxt.c:1093
TransactionId xid
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:496
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
struct ReorderBufferChange::@97::@99 truncate
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70
int i
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 909 of file pgoutput.c.

References publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

910 {
911  publications_valid = false;
912 
913  /*
914  * Also invalidate per-relation cache so that next time the filtering info
915  * is checked it will be updated with the new publication settings.
916  */
917  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
918 }
static bool publications_valid
Definition: pgoutput.c:77
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1349
void * arg

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 1349 of file pgoutput.c.

References hash_seq_init(), hash_seq_search(), RelationSyncEntry::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationActions::pubtruncate, PublicationActions::pubupdate, RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

1350 {
1352  RelationSyncEntry *entry;
1353 
1354  /*
1355  * We can get here if the plugin was used in SQL interface as the
1356  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1357  * is no way to unregister the relcache invalidation callback.
1358  */
1359  if (RelationSyncCache == NULL)
1360  return;
1361 
1362  /*
1363  * There is no way to find which entry in our cache the hash belongs to so
1364  * mark the whole cache as invalid.
1365  */
1366  hash_seq_init(&status, RelationSyncCache);
1367  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
1368  {
1369  entry->replicate_valid = false;
1370 
1371  /*
1372  * There might be some relations dropped from the publication so we
1373  * don't need to publish the changes for them.
1374  */
1375  entry->pubactions.pubinsert = false;
1376  entry->pubactions.pubupdate = false;
1377  entry->pubactions.pubdelete = false;
1378  entry->pubactions.pubtruncate = false;
1379  }
1380 }
bool replicate_valid
Definition: pgoutput.c:115
static HTAB * RelationSyncCache
Definition: pgoutput.c:136
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1436
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1426
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
PublicationActions pubactions
Definition: pgoutput.c:116

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 1295 of file pgoutput.c.

References free_conversion_map(), FreeTupleDesc(), HASH_FIND, hash_search(), TupleConversionMap::indesc, list_free(), RelationSyncEntry::map, NIL, TupleConversionMap::outdesc, RelationSyncEntry::schema_sent, and RelationSyncEntry::streamed_txns.

Referenced by init_rel_sync_cache().

1296 {
1297  RelationSyncEntry *entry;
1298 
1299  /*
1300  * We can get here if the plugin was used in SQL interface as the
1301  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
1302  * is no way to unregister the relcache invalidation callback.
1303  */
1304  if (RelationSyncCache == NULL)
1305  return;
1306 
1307  /*
1308  * Nobody keeps pointers to entries in this hash table around outside
1309  * logical decoding callback calls - but invalidation events can come in
1310  * *during* a callback if we access the relcache in the callback. Because
1311  * of that we must mark the cache entry as invalid but not remove it from
1312  * the hash while it could still be referenced, then prune it at a later
1313  * safe point.
1314  *
1315  * Getting invalidations for relations that aren't in the table is
1316  * entirely normal, since there's no way to unregister for an invalidation
1317  * event. So we don't care if it's found or not.
1318  */
1319  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
1320  HASH_FIND, NULL);
1321 
1322  /*
1323  * Reset schema sent status as the relation definition may have changed.
1324  * Also free any objects that depended on the earlier definition.
1325  */
1326  if (entry != NULL)
1327  {
1328  entry->schema_sent = false;
1329  list_free(entry->streamed_txns);
1330  entry->streamed_txns = NIL;
1331  if (entry->map)
1332  {
1333  /*
1334  * Must free the TupleDescs contained in the map explicitly,
1335  * because free_conversion_map() doesn't.
1336  */
1337  FreeTupleDesc(entry->map->indesc);
1338  FreeTupleDesc(entry->map->outdesc);
1339  free_conversion_map(entry->map);
1340  }
1341  entry->map = NULL;
1342  }
1343 }
List * streamed_txns
Definition: pgoutput.c:112
#define NIL
Definition: pg_list.h:65
TupleDesc indesc
Definition: tupconvert.h:26
TupleDesc outdesc
Definition: tupconvert.h:27
TupleConversionMap * map
Definition: pgoutput.c:132
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:954
void free_conversion_map(TupleConversionMap *map)
Definition: tupconvert.c:284
static HTAB * RelationSyncCache
Definition: pgoutput.c:136
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:309
void list_free(List *list)
Definition: list.c:1391

◆ send_relation_and_attrs()

static void send_relation_and_attrs ( Relation  relation,
TransactionId  xid,
LogicalDecodingContext ctx 
)
static

Definition at line 589 of file pgoutput.c.

References FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, and TupleDescAttr.

Referenced by maybe_send_schema().

591 {
592  TupleDesc desc = RelationGetDescr(relation);
593  int i;
594 
595  /*
596  * Write out type info if needed. We do that only for user-created types.
597  * We use FirstGenbkiObjectId as the cutoff, so that we only consider
598  * objects with hand-assigned OIDs to be "built in", not for instance any
599  * function or type defined in the information_schema. This is important
600  * because only hand-assigned OIDs can be expected to remain stable across
601  * major versions.
602  */
603  for (i = 0; i < desc->natts; i++)
604  {
605  Form_pg_attribute att = TupleDescAttr(desc, i);
606 
607  if (att->attisdropped || att->attgenerated)
608  continue;
609 
610  if (att->atttypid < FirstGenbkiObjectId)
611  continue;
612 
613  OutputPluginPrepareWrite(ctx, false);
614  logicalrep_write_typ(ctx->out, xid, att->atttypid);
615  OutputPluginWrite(ctx, false);
616  }
617 
618  OutputPluginPrepareWrite(ctx, false);
619  logicalrep_write_rel(ctx->out, xid, relation);
620  OutputPluginWrite(ctx, false);
621 }
#define RelationGetDescr(relation)
Definition: rel.h:503
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
Definition: proto.c:654
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:207
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
#define FirstGenbkiObjectId
Definition: transam.h:195
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70
int i
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:708

◆ send_repl_origin()

static void send_repl_origin ( LogicalDecodingContext ctx,
RepOriginId  origin_id,
XLogRecPtr  origin_lsn,
bool  send_origin 
)
static

Definition at line 1384 of file pgoutput.c.

References logicalrep_write_origin(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by pgoutput_begin_prepare_txn(), pgoutput_begin_txn(), and pgoutput_stream_start().

1386 {
1387  if (send_origin)
1388  {
1389  char *origin;
1390 
1391  /*----------
1392  * XXX: which behaviour do we want here?
1393  *
1394  * Alternatives:
1395  * - don't send origin message if origin name not found
1396  * (that's what we do now)
1397  * - throw error - that will break replication, not good
1398  * - send some special "unknown" origin
1399  *----------
1400  */
1401  if (replorigin_by_oid(origin_id, true, &origin))
1402  {
1403  /* Message boundary */
1404  OutputPluginWrite(ctx, false);
1405  OutputPluginPrepareWrite(ctx, true);
1406 
1407  logicalrep_write_origin(ctx->out, origin, origin_lsn);
1408  }
1409  }
1410 }
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:371
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:449
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:648
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:661
StringInfo out
Definition: logical.h:70

◆ set_schema_sent_in_streamed_txn()

static void set_schema_sent_in_streamed_txn ( RelationSyncEntry entry,
TransactionId  xid 
)
static

Definition at line 1095 of file pgoutput.c.

References CacheMemoryContext, lappend_int(), MemoryContextSwitchTo(), and RelationSyncEntry::streamed_txns.

Referenced by maybe_send_schema().

1096 {
1097  MemoryContext oldctx;
1098 
1100 
1101  entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
1102 
1103  MemoryContextSwitchTo(oldctx);
1104 }
List * streamed_txns
Definition: pgoutput.c:112
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
List * lappend_int(List *list, int datum)
Definition: list.c:354
MemoryContext CacheMemoryContext
Definition: mcxt.c:51

Variable Documentation

◆ in_streaming

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 31 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 77 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache

HTAB* RelationSyncCache = NULL
static

Definition at line 136 of file pgoutput.c.