PostgreSQL Source Code  git master
pgoutput.c File Reference
#include "postgres.h"
#include "catalog/pg_publication.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/inval.h"
#include "utils/int8.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 

Functions

void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void init_rel_sync_cache (MemoryContext decoding_context)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Oid relid)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void parse_output_parameters (List *options, uint32 *protocol_version, List **publication_names)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static HTABRelationSyncCache = NULL
 

Typedef Documentation

◆ RelationSyncEntry

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 74 of file pgoutput.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::filter_by_origin_cb, pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_txn(), pgoutput_origin_filter(), pgoutput_shutdown(), pgoutput_startup(), OutputPluginCallbacks::shutdown_cb, and OutputPluginCallbacks::startup_cb.

75 {
77 
84 }
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:258
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:74
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:381
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:244
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:35
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:147
LogicalDecodeChangeCB change_cb
Definition: output_plugin.h:99
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:368
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStartupCB startup_cb
Definition: output_plugin.h:97
LogicalDecodeBeginCB begin_cb
Definition: output_plugin.h:98
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:208
LogicalDecodeFilterByOriginCB filter_by_origin_cb
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:823

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Oid  relid 
)
static

Definition at line 465 of file pgoutput.c.

References Publication::alltables, Assert, CacheMemoryContext, get_rel_name(), get_rel_relkind(), GetRelationPublications(), HASH_ENTER, hash_search(), lfirst, list_free(), list_free_deep(), list_member_oid(), LoadPublications(), MemoryContextSwitchTo(), Publication::oid, RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PGOutputData::publication_names, PGOutputData::publications, publications_valid, PublicationActions::pubupdate, RelationSyncEntry::relid, RELKIND_RELATION, RelationSyncEntry::replicate_valid, and RelationSyncEntry::schema_sent.

Referenced by pgoutput_change().

466 {
467  RelationSyncEntry *entry;
468  bool found;
469  MemoryContext oldctx;
470 
471  Assert(RelationSyncCache != NULL);
472 
473  /* Find cached function info, creating if not found */
476  (void *) &relid,
477  HASH_ENTER, &found);
478  MemoryContextSwitchTo(oldctx);
479  Assert(entry != NULL);
480 
481  /* Not found means schema wasn't sent */
482  if (!found || !entry->replicate_valid)
483  {
484  List *pubids = GetRelationPublications(relid);
485  ListCell *lc;
486 
487  /* Reload publications if needed before use. */
488  if (!publications_valid)
489  {
491  if (data->publications)
493 
495  MemoryContextSwitchTo(oldctx);
496  publications_valid = true;
497  }
498 
499  /*
500  * Build publication cache. We can't use one provided by relcache as
501  * relcache considers all publications given relation is in, but here
502  * we only need to consider ones that the subscriber requested.
503  */
504  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
505  entry->pubactions.pubdelete = false;
506 
507  foreach(lc, data->publications)
508  {
509  Publication *pub = lfirst(lc);
510 
511  /*
512  * Skip tables that look like they are from a heap rewrite (see
513  * make_new_heap()). We need to skip them because the subscriber
514  * won't have a table by that name to receive the data. That
515  * means we won't ship the new data in, say, an added column with
516  * a DEFAULT, but if the user applies the same DDL manually on the
517  * subscriber, then this will work out for them.
518  *
519  * We only need to consider the alltables case, because such a
520  * transient heap won't be an explicit member of a publication.
521  */
522  if (pub->alltables)
523  {
524  char *relname = get_rel_name(relid);
525  unsigned int u;
526  int n;
527 
528  if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 &&
529  relname[n] == '\0')
530  {
532  break;
533  }
534  }
535 
536  if (pub->alltables || list_member_oid(pubids, pub->oid))
537  {
538  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
539  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
540  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
541  }
542 
543  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
544  entry->pubactions.pubdelete)
545  break;
546  }
547 
548  list_free(pubids);
549 
550  entry->replicate_valid = true;
551  }
552 
553  if (!found)
554  entry->schema_sent = false;
555 
556  return entry;
557 }
PublicationActions pubactions
static bool publications_valid
Definition: pgoutput.c:46
bool replicate_valid
Definition: pgoutput.c:57
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1805
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
void list_free_deep(List *list)
Definition: list.c:1147
List * GetRelationPublications(Oid relid)
List * publication_names
Definition: pgoutput.h:26
static HTAB * RelationSyncCache
Definition: pgoutput.c:62
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:505
#define Assert(condition)
Definition: c.h:688
#define lfirst(lc)
Definition: pg_list.h:106
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:394
List * publications
Definition: pgoutput.h:27
void list_free(List *list)
Definition: list.c:1133
#define RELKIND_RELATION
Definition: pg_class.h:160
Definition: pg_list.h:45
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
PublicationActions pubactions
Definition: pgoutput.c:58

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  decoding_context)
static

Definition at line 433 of file pgoutput.c.

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, MemoryContextSwitchTo(), MemSet, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), and rel_sync_cache_relation_cb().

Referenced by pgoutput_startup().

434 {
435  HASHCTL ctl;
436  MemoryContext old_ctxt;
437 
438  if (RelationSyncCache != NULL)
439  return;
440 
441  /* Make a new hash table for the cache */
442  MemSet(&ctl, 0, sizeof(ctl));
443  ctl.keysize = sizeof(Oid);
444  ctl.entrysize = sizeof(RelationSyncEntry);
445  ctl.hcxt = cachectx;
446 
447  old_ctxt = MemoryContextSwitchTo(cachectx);
448  RelationSyncCache = hash_create("logical replication output relation cache",
449  128, &ctl,
451  (void) MemoryContextSwitchTo(old_ctxt);
452 
453  Assert(RelationSyncCache != NULL);
454 
458  (Datum) 0);
459 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:563
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:897
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:601
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1431
static HTAB * RelationSyncCache
Definition: pgoutput.c:62
#define HASH_BLOBS
Definition: hsearch.h:88
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
uintptr_t Datum
Definition: postgres.h:365
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:688

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 394 of file pgoutput.c.

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

395 {
396  List *result = NIL;
397  ListCell *lc;
398 
399  foreach(lc, pubnames)
400  {
401  char *pubname = (char *) lfirst(lc);
402  Publication *pub = GetPublicationByName(pubname, false);
403 
404  result = lappend(result, pub);
405  }
406 
407  return result;
408 }
#define NIL
Definition: pg_list.h:69
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * lappend(List *list, void *datum)
Definition: list.c:128
#define lfirst(lc)
Definition: pg_list.h:106
Definition: pg_list.h:45

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
uint32 protocol_version,
List **  publication_names 
)
static

Definition at line 87 of file pgoutput.c.

References DefElem::arg, Assert, DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, PG_UINT32_MAX, scanint8(), SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

89 {
90  ListCell *lc;
91  bool protocol_version_given = false;
92  bool publication_names_given = false;
93 
94  foreach(lc, options)
95  {
96  DefElem *defel = (DefElem *) lfirst(lc);
97 
98  Assert(defel->arg == NULL || IsA(defel->arg, String));
99 
100  /* Check each param, whether or not we recognize it */
101  if (strcmp(defel->defname, "proto_version") == 0)
102  {
103  int64 parsed;
104 
105  if (protocol_version_given)
106  ereport(ERROR,
107  (errcode(ERRCODE_SYNTAX_ERROR),
108  errmsg("conflicting or redundant options")));
109  protocol_version_given = true;
110 
111  if (!scanint8(strVal(defel->arg), true, &parsed))
112  ereport(ERROR,
113  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114  errmsg("invalid proto_version")));
115 
116  if (parsed > PG_UINT32_MAX || parsed < 0)
117  ereport(ERROR,
118  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119  errmsg("proto_version \"%s\" out of range",
120  strVal(defel->arg))));
121 
122  *protocol_version = (uint32) parsed;
123  }
124  else if (strcmp(defel->defname, "publication_names") == 0)
125  {
126  if (publication_names_given)
127  ereport(ERROR,
128  (errcode(ERRCODE_SYNTAX_ERROR),
129  errmsg("conflicting or redundant options")));
130  publication_names_given = true;
131 
132  if (!SplitIdentifierString(strVal(defel->arg), ',',
133  publication_names))
134  ereport(ERROR,
135  (errcode(ERRCODE_INVALID_NAME),
136  errmsg("invalid publication_names syntax")));
137  }
138  else
139  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
140  }
141 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:564
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_UINT32_MAX
Definition: c.h:398
#define ERROR
Definition: elog.h:43
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3263
unsigned int uint32
Definition: c.h:314
#define ereport(elevel, rest)
Definition: elog.h:122
Node * arg
Definition: parsenodes.h:728
#define Assert(condition)
Definition: c.h:688
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:727
#define elog
Definition: elog.h:219
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 208 of file pgoutput.c.

References InvalidRepOriginId, logicalrep_write_begin(), logicalrep_write_origin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by _PG_output_plugin_init().

209 {
210  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
211 
212  OutputPluginPrepareWrite(ctx, !send_replication_origin);
213  logicalrep_write_begin(ctx->out, txn);
214 
215  if (send_replication_origin)
216  {
217  char *origin;
218 
219  /* Message boundary */
220  OutputPluginWrite(ctx, false);
221  OutputPluginPrepareWrite(ctx, true);
222 
223  /*----------
224  * XXX: which behaviour do we want here?
225  *
226  * Alternatives:
227  * - don't send origin message if origin name not found
228  * (that's what we do now)
229  * - throw error - that will break replication, not good
230  * - send some special "unknown" origin
231  *----------
232  */
233  if (replorigin_by_oid(txn->origin_id, true, &origin))
234  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
235  }
236 
237  OutputPluginWrite(ctx, true);
238 }
RepOriginId origin_id
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:110
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:434
XLogRecPtr origin_lsn
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:43
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:496
#define InvalidRepOriginId
Definition: origin.h:34
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:509
StringInfo out
Definition: logical.h:73

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 258 of file pgoutput.c.

References ReorderBufferChange::action, Assert, PGOutputData::context, ReorderBufferChange::data, DEBUG1, elog, FirstNormalObjectId, get_rel_sync_entry(), i, logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_rel(), logicalrep_write_typ(), logicalrep_write_update(), MemoryContextReset(), MemoryContextSwitchTo(), tupleDesc::natts, LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, ReorderBufferChange::tp, and TupleDescAttr.

Referenced by _PG_output_plugin_init().

260 {
262  MemoryContext old;
263  RelationSyncEntry *relentry;
264 
265  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
266 
267  /* First check the table filter */
268  switch (change->action)
269  {
271  if (!relentry->pubactions.pubinsert)
272  return;
273  break;
275  if (!relentry->pubactions.pubupdate)
276  return;
277  break;
279  if (!relentry->pubactions.pubdelete)
280  return;
281  break;
282  default:
283  Assert(false);
284  }
285 
286  /* Avoid leaking memory by using and resetting our own context */
287  old = MemoryContextSwitchTo(data->context);
288 
289  /*
290  * Write the relation schema if the current schema haven't been sent yet.
291  */
292  if (!relentry->schema_sent)
293  {
294  TupleDesc desc;
295  int i;
296 
297  desc = RelationGetDescr(relation);
298 
299  /*
300  * Write out type info if needed. We do that only for user created
301  * types.
302  */
303  for (i = 0; i < desc->natts; i++)
304  {
305  Form_pg_attribute att = TupleDescAttr(desc, i);
306 
307  if (att->attisdropped)
308  continue;
309 
310  if (att->atttypid < FirstNormalObjectId)
311  continue;
312 
313  OutputPluginPrepareWrite(ctx, false);
314  logicalrep_write_typ(ctx->out, att->atttypid);
315  OutputPluginWrite(ctx, false);
316  }
317 
318  OutputPluginPrepareWrite(ctx, false);
319  logicalrep_write_rel(ctx->out, relation);
320  OutputPluginWrite(ctx, false);
321  relentry->schema_sent = true;
322  }
323 
324  /* Send the data */
325  switch (change->action)
326  {
328  OutputPluginPrepareWrite(ctx, true);
329  logicalrep_write_insert(ctx->out, relation,
330  &change->data.tp.newtuple->tuple);
331  OutputPluginWrite(ctx, true);
332  break;
334  {
335  HeapTuple oldtuple = change->data.tp.oldtuple ?
336  &change->data.tp.oldtuple->tuple : NULL;
337 
338  OutputPluginPrepareWrite(ctx, true);
339  logicalrep_write_update(ctx->out, relation, oldtuple,
340  &change->data.tp.newtuple->tuple);
341  OutputPluginWrite(ctx, true);
342  break;
343  }
345  if (change->data.tp.oldtuple)
346  {
347  OutputPluginPrepareWrite(ctx, true);
348  logicalrep_write_delete(ctx->out, relation,
349  &change->data.tp.oldtuple->tuple);
350  OutputPluginWrite(ctx, true);
351  }
352  else
353  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
354  break;
355  default:
356  Assert(false);
357  }
358 
359  /* Cleanup */
362 }
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:349
#define DEBUG1
Definition: elog.h:25
#define RelationGetDescr(relation)
Definition: rel.h:437
MemoryContext context
Definition: pgoutput.h:20
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:90
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:78
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:134
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:77
int natts
Definition: tupdesc.h:79
#define FirstNormalObjectId
Definition: transam.h:94
struct ReorderBufferChange::@102::@103 tp
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:252
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:139
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:465
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:299
union ReorderBufferChange::@102 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:496
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:182
#define Assert(condition)
Definition: c.h:688
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:509
StringInfo out
Definition: logical.h:73
int i
#define elog
Definition: elog.h:219
#define RelationGetRelid(relation)
Definition: rel.h:425

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 244 of file pgoutput.c.

References logicalrep_write_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

246 {
248 
249  OutputPluginPrepareWrite(ctx, true);
250  logicalrep_write_commit(ctx->out, txn, commit_lsn);
251  OutputPluginWrite(ctx, true);
252 }
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:72
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:522
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:496
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:509
StringInfo out
Definition: logical.h:73

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 368 of file pgoutput.c.

Referenced by _PG_output_plugin_init().

370 {
371  return false;
372 }

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 381 of file pgoutput.c.

References hash_destroy().

Referenced by _PG_output_plugin_init().

382 {
383  if (RelationSyncCache)
384  {
386  RelationSyncCache = NULL;
387  }
388 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:812
static HTAB * RelationSyncCache
Definition: pgoutput.c:62

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 147 of file pgoutput.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), PGOutputData::context, LogicalDecodingContext::context, ereport, errcode(), errmsg(), ERROR, init_rel_sync_cache(), list_length(), LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), PGOutputData::protocol_version, publication_invalidation_cb(), PGOutputData::publication_names, PUBLICATIONOID, PGOutputData::publications, and publications_valid.

Referenced by _PG_output_plugin_init().

149 {
150  PGOutputData *data = palloc0(sizeof(PGOutputData));
151 
152  /* Create our memory context for private allocations. */
154  "logical replication output context",
156 
157  ctx->output_plugin_private = data;
158 
159  /* This plugin uses binary protocol. */
161 
162  /*
163  * This is replication start and not slot initialization.
164  *
165  * Parse and validate options passed by the client.
166  */
167  if (!is_init)
168  {
169  /* Parse the params and ERROR if we see any we don't recognize */
171  &data->protocol_version,
172  &data->publication_names);
173 
174  /* Check if we support requested protocol */
176  ereport(ERROR,
177  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
178  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
180 
182  ereport(ERROR,
183  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
184  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
186 
187  if (list_length(data->publication_names) < 1)
188  ereport(ERROR,
189  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
190  errmsg("publication_names parameter missing")));
191 
192  /* Init publication state. */
193  data->publications = NIL;
194  publications_valid = false;
197  (Datum) 0);
198 
199  /* Initialize relation schema cache. */
201  }
202 }
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
static bool publications_valid
Definition: pgoutput.c:46
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:414
MemoryContext context
Definition: pgoutput.h:20
int errcode(int sqlerrcode)
Definition: elog.c:575
void * output_plugin_private
Definition: logical.h:78
MemoryContext context
Definition: logical.h:38
List * output_plugin_options
Definition: logical.h:61
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
List * publication_names
Definition: pgoutput.h:26
#define ereport(elevel, rest)
Definition: elog.h:122
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
void * palloc0(Size size)
Definition: mcxt.c:864
uintptr_t Datum
Definition: postgres.h:365
static int list_length(const List *l)
Definition: pg_list.h:89
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:87
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:433
int errmsg(const char *fmt,...)
Definition: elog.c:797
MemoryContext CacheMemoryContext
Definition: mcxt.c:46
uint32 protocol_version
Definition: pgoutput.h:24

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 414 of file pgoutput.c.

References publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

415 {
416  publications_valid = false;
417 
418  /*
419  * Also invalidate per-relation cache so that next time the filtering info
420  * is checked it will be updated with the new publication settings.
421  */
422  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
423 }
static bool publications_valid
Definition: pgoutput.c:46
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:601
void * arg

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 601 of file pgoutput.c.

References hash_seq_init(), hash_seq_search(), RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

602 {
604  RelationSyncEntry *entry;
605 
606  /*
607  * We can get here if the plugin was used in SQL interface as the
608  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
609  * is no way to unregister the relcache invalidation callback.
610  */
611  if (RelationSyncCache == NULL)
612  return;
613 
614  /*
615  * There is no way to find which entry in our cache the hash belongs to so
616  * mark the whole cache as invalid.
617  */
619  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
620  entry->replicate_valid = false;
621 }
bool replicate_valid
Definition: pgoutput.c:57
static HTAB * RelationSyncCache
Definition: pgoutput.c:62
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1387
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1377
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 563 of file pgoutput.c.

References HASH_FIND, hash_search(), and RelationSyncEntry::schema_sent.

Referenced by init_rel_sync_cache().

564 {
565  RelationSyncEntry *entry;
566 
567  /*
568  * We can get here if the plugin was used in SQL interface as the
569  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
570  * is no way to unregister the relcache invalidation callback.
571  */
572  if (RelationSyncCache == NULL)
573  return;
574 
575  /*
576  * Nobody keeps pointers to entries in this hash table around outside
577  * logical decoding callback calls - but invalidation events can come in
578  * *during* a callback if we access the relcache in the callback. Because
579  * of that we must mark the cache entry as invalid but not remove it from
580  * the hash while it could still be referenced, then prune it at a later
581  * safe point.
582  *
583  * Getting invalidations for relations that aren't in the table is
584  * entirely normal, since there's no way to unregister for an invalidation
585  * event. So we don't care if it's found or not.
586  */
587  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
588  HASH_FIND, NULL);
589 
590  /*
591  * Reset schema sent status as the relation definition may have changed.
592  */
593  if (entry != NULL)
594  entry->schema_sent = false;
595 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
static HTAB * RelationSyncCache
Definition: pgoutput.c:62

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 29 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 46 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache

HTAB* RelationSyncCache = NULL
static

Definition at line 62 of file pgoutput.c.