PostgreSQL Source Code  git master
pgoutput.c File Reference
#include "postgres.h"
#include "catalog/pg_publication.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/inval.h"
#include "utils/int8.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 

Functions

void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void init_rel_sync_cache (MemoryContext decoding_context)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Oid relid)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void parse_output_parameters (List *options, uint32 *protocol_version, List **publication_names)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static HTABRelationSyncCache = NULL
 

Typedef Documentation

◆ RelationSyncEntry

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 76 of file pgoutput.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::filter_by_origin_cb, pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_txn(), pgoutput_origin_filter(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_truncate(), OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, and OutputPluginCallbacks::truncate_cb.

77 {
79 
87 }
LogicalDecodeTruncateCB truncate_cb
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:301
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:76
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:439
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:247
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:150
LogicalDecodeChangeCB change_cb
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:426
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeBeginCB begin_cb
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:378
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:211
LogicalDecodeFilterByOriginCB filter_by_origin_cb
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:834

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Oid  relid 
)
static

Definition at line 523 of file pgoutput.c.

References Publication::alltables, Assert, CacheMemoryContext, GetRelationPublications(), HASH_ENTER, hash_search(), lfirst, list_free(), list_free_deep(), list_member_oid(), LoadPublications(), MemoryContextSwitchTo(), Publication::oid, RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PGOutputData::publication_names, PGOutputData::publications, publications_valid, PublicationActions::pubtruncate, PublicationActions::pubupdate, RelationSyncEntry::relid, RelationSyncEntry::replicate_valid, and RelationSyncEntry::schema_sent.

Referenced by pgoutput_change(), and pgoutput_truncate().

524 {
525  RelationSyncEntry *entry;
526  bool found;
527  MemoryContext oldctx;
528 
529  Assert(RelationSyncCache != NULL);
530 
531  /* Find cached function info, creating if not found */
534  (void *) &relid,
535  HASH_ENTER, &found);
536  MemoryContextSwitchTo(oldctx);
537  Assert(entry != NULL);
538 
539  /* Not found means schema wasn't sent */
540  if (!found || !entry->replicate_valid)
541  {
542  List *pubids = GetRelationPublications(relid);
543  ListCell *lc;
544 
545  /* Reload publications if needed before use. */
546  if (!publications_valid)
547  {
549  if (data->publications)
551 
553  MemoryContextSwitchTo(oldctx);
554  publications_valid = true;
555  }
556 
557  /*
558  * Build publication cache. We can't use one provided by relcache as
559  * relcache considers all publications given relation is in, but here
560  * we only need to consider ones that the subscriber requested.
561  */
562  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
563  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
564 
565  foreach(lc, data->publications)
566  {
567  Publication *pub = lfirst(lc);
568 
569  if (pub->alltables || list_member_oid(pubids, pub->oid))
570  {
571  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
572  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
573  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
575  }
576 
577  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
578  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
579  break;
580  }
581 
582  list_free(pubids);
583 
584  entry->replicate_valid = true;
585  }
586 
587  if (!found)
588  entry->schema_sent = false;
589 
590  return entry;
591 }
PublicationActions pubactions
static bool publications_valid
Definition: pgoutput.c:48
bool replicate_valid
Definition: pgoutput.c:59
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
void list_free_deep(List *list)
Definition: list.c:1147
List * GetRelationPublications(Oid relid)
List * publication_names
Definition: pgoutput.h:26
static HTAB * RelationSyncCache
Definition: pgoutput.c:64
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:505
#define Assert(condition)
Definition: c.h:699
#define lfirst(lc)
Definition: pg_list.h:106
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:452
List * publications
Definition: pgoutput.h:27
void list_free(List *list)
Definition: list.c:1133
Definition: pg_list.h:45
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:60

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  decoding_context)
static

Definition at line 491 of file pgoutput.c.

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, MemoryContextSwitchTo(), MemSet, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), and rel_sync_cache_relation_cb().

Referenced by pgoutput_startup().

492 {
493  HASHCTL ctl;
494  MemoryContext old_ctxt;
495 
496  if (RelationSyncCache != NULL)
497  return;
498 
499  /* Make a new hash table for the cache */
500  MemSet(&ctl, 0, sizeof(ctl));
501  ctl.keysize = sizeof(Oid);
502  ctl.entrysize = sizeof(RelationSyncEntry);
503  ctl.hcxt = cachectx;
504 
505  old_ctxt = MemoryContextSwitchTo(cachectx);
506  RelationSyncCache = hash_create("logical replication output relation cache",
507  128, &ctl,
509  (void) MemoryContextSwitchTo(old_ctxt);
510 
511  Assert(RelationSyncCache != NULL);
512 
516  (Datum) 0);
517 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:597
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:908
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:635
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1431
static HTAB * RelationSyncCache
Definition: pgoutput.c:64
#define HASH_BLOBS
Definition: hsearch.h:88
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
uintptr_t Datum
Definition: postgres.h:365
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:699

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 452 of file pgoutput.c.

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

453 {
454  List *result = NIL;
455  ListCell *lc;
456 
457  foreach(lc, pubnames)
458  {
459  char *pubname = (char *) lfirst(lc);
460  Publication *pub = GetPublicationByName(pubname, false);
461 
462  result = lappend(result, pub);
463  }
464 
465  return result;
466 }
#define NIL
Definition: pg_list.h:69
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * lappend(List *list, void *datum)
Definition: list.c:128
#define lfirst(lc)
Definition: pg_list.h:106
Definition: pg_list.h:45

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 261 of file pgoutput.c.

References FirstNormalObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), tupleDesc::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, RelationSyncEntry::schema_sent, and TupleDescAttr.

Referenced by pgoutput_change(), and pgoutput_truncate().

263 {
264  if (!relentry->schema_sent)
265  {
266  TupleDesc desc;
267  int i;
268 
269  desc = RelationGetDescr(relation);
270 
271  /*
272  * Write out type info if needed. We do that only for user created
273  * types.
274  */
275  for (i = 0; i < desc->natts; i++)
276  {
277  Form_pg_attribute att = TupleDescAttr(desc, i);
278 
279  if (att->attisdropped)
280  continue;
281 
282  if (att->atttypid < FirstNormalObjectId)
283  continue;
284 
285  OutputPluginPrepareWrite(ctx, false);
286  logicalrep_write_typ(ctx->out, att->atttypid);
287  OutputPluginWrite(ctx, false);
288  }
289 
290  OutputPluginPrepareWrite(ctx, false);
291  logicalrep_write_rel(ctx->out, relation);
292  OutputPluginWrite(ctx, false);
293  relentry->schema_sent = true;
294  }
295 }
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:404
#define RelationGetDescr(relation)
Definition: rel.h:433
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:93
int natts
Definition: tupdesc.h:82
#define FirstNormalObjectId
Definition: transam.h:94
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:197
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:354
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73
int i

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
uint32 protocol_version,
List **  publication_names 
)
static

Definition at line 90 of file pgoutput.c.

References DefElem::arg, Assert, DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, PG_UINT32_MAX, scanint8(), SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

92 {
93  ListCell *lc;
94  bool protocol_version_given = false;
95  bool publication_names_given = false;
96 
97  foreach(lc, options)
98  {
99  DefElem *defel = (DefElem *) lfirst(lc);
100 
101  Assert(defel->arg == NULL || IsA(defel->arg, String));
102 
103  /* Check each param, whether or not we recognize it */
104  if (strcmp(defel->defname, "proto_version") == 0)
105  {
106  int64 parsed;
107 
108  if (protocol_version_given)
109  ereport(ERROR,
110  (errcode(ERRCODE_SYNTAX_ERROR),
111  errmsg("conflicting or redundant options")));
112  protocol_version_given = true;
113 
114  if (!scanint8(strVal(defel->arg), true, &parsed))
115  ereport(ERROR,
116  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117  errmsg("invalid proto_version")));
118 
119  if (parsed > PG_UINT32_MAX || parsed < 0)
120  ereport(ERROR,
121  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
122  errmsg("proto_version \"%s\" out of range",
123  strVal(defel->arg))));
124 
125  *protocol_version = (uint32) parsed;
126  }
127  else if (strcmp(defel->defname, "publication_names") == 0)
128  {
129  if (publication_names_given)
130  ereport(ERROR,
131  (errcode(ERRCODE_SYNTAX_ERROR),
132  errmsg("conflicting or redundant options")));
133  publication_names_given = true;
134 
135  if (!SplitIdentifierString(strVal(defel->arg), ',',
136  publication_names))
137  ereport(ERROR,
138  (errcode(ERRCODE_INVALID_NAME),
139  errmsg("invalid publication_names syntax")));
140  }
141  else
142  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
143  }
144 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:568
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PG_UINT32_MAX
Definition: c.h:409
#define ERROR
Definition: elog.h:43
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3290
unsigned int uint32
Definition: c.h:325
#define ereport(elevel, rest)
Definition: elog.h:122
Node * arg
Definition: parsenodes.h:731
#define Assert(condition)
Definition: c.h:699
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:730
#define elog
Definition: elog.h:219
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 211 of file pgoutput.c.

References InvalidRepOriginId, logicalrep_write_begin(), logicalrep_write_origin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by _PG_output_plugin_init().

212 {
213  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
214 
215  OutputPluginPrepareWrite(ctx, !send_replication_origin);
216  logicalrep_write_begin(ctx->out, txn);
217 
218  if (send_replication_origin)
219  {
220  char *origin;
221 
222  /* Message boundary */
223  OutputPluginWrite(ctx, false);
224  OutputPluginPrepareWrite(ctx, true);
225 
226  /*----------
227  * XXX: which behaviour do we want here?
228  *
229  * Alternatives:
230  * - don't send origin message if origin name not found
231  * (that's what we do now)
232  * - throw error - that will break replication, not good
233  * - send some special "unknown" origin
234  *----------
235  */
236  if (replorigin_by_oid(txn->origin_id, true, &origin))
237  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
238  }
239 
240  OutputPluginWrite(ctx, true);
241 }
RepOriginId origin_id
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:434
XLogRecPtr origin_lsn
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
#define InvalidRepOriginId
Definition: origin.h:34
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 301 of file pgoutput.c.

References ReorderBufferChange::action, Assert, PGOutputData::context, ReorderBufferChange::data, DEBUG1, elog, get_rel_sync_entry(), is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, and ReorderBufferChange::tp.

Referenced by _PG_output_plugin_init().

303 {
305  MemoryContext old;
306  RelationSyncEntry *relentry;
307 
308  if (!is_publishable_relation(relation))
309  return;
310 
311  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
312 
313  /* First check the table filter */
314  switch (change->action)
315  {
317  if (!relentry->pubactions.pubinsert)
318  return;
319  break;
321  if (!relentry->pubactions.pubupdate)
322  return;
323  break;
325  if (!relentry->pubactions.pubdelete)
326  return;
327  break;
328  default:
329  Assert(false);
330  }
331 
332  /* Avoid leaking memory by using and resetting our own context */
333  old = MemoryContextSwitchTo(data->context);
334 
335  maybe_send_schema(ctx, relation, relentry);
336 
337  /* Send the data */
338  switch (change->action)
339  {
341  OutputPluginPrepareWrite(ctx, true);
342  logicalrep_write_insert(ctx->out, relation,
343  &change->data.tp.newtuple->tuple);
344  OutputPluginWrite(ctx, true);
345  break;
347  {
348  HeapTuple oldtuple = change->data.tp.oldtuple ?
349  &change->data.tp.oldtuple->tuple : NULL;
350 
351  OutputPluginPrepareWrite(ctx, true);
352  logicalrep_write_update(ctx->out, relation, oldtuple,
353  &change->data.tp.newtuple->tuple);
354  OutputPluginWrite(ctx, true);
355  break;
356  }
358  if (change->data.tp.oldtuple)
359  {
360  OutputPluginPrepareWrite(ctx, true);
361  logicalrep_write_delete(ctx->out, relation,
362  &change->data.tp.oldtuple->tuple);
363  OutputPluginWrite(ctx, true);
364  }
365  else
366  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
367  break;
368  default:
369  Assert(false);
370  }
371 
372  /* Cleanup */
375 }
#define DEBUG1
Definition: elog.h:25
MemoryContext context
Definition: pgoutput.h:20
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:261
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:78
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:78
struct ReorderBufferChange::@102::@103 tp
bool is_publishable_relation(Relation rel)
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:255
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:142
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:523
union ReorderBufferChange::@102 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:185
#define Assert(condition)
Definition: c.h:699
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73
#define elog
Definition: elog.h:219
#define RelationGetRelid(relation)
Definition: rel.h:407

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 247 of file pgoutput.c.

References logicalrep_write_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

249 {
251 
252  OutputPluginPrepareWrite(ctx, true);
253  logicalrep_write_commit(ctx->out, txn, commit_lsn);
254  OutputPluginWrite(ctx, true);
255 }
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:529
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 426 of file pgoutput.c.

Referenced by _PG_output_plugin_init().

428 {
429  return false;
430 }

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 439 of file pgoutput.c.

References hash_destroy().

Referenced by _PG_output_plugin_init().

440 {
441  if (RelationSyncCache)
442  {
444  RelationSyncCache = NULL;
445  }
446 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:814
static HTAB * RelationSyncCache
Definition: pgoutput.c:64

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 150 of file pgoutput.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), PGOutputData::context, LogicalDecodingContext::context, ereport, errcode(), errmsg(), ERROR, init_rel_sync_cache(), list_length(), LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), PGOutputData::protocol_version, publication_invalidation_cb(), PGOutputData::publication_names, PUBLICATIONOID, PGOutputData::publications, and publications_valid.

Referenced by _PG_output_plugin_init().

152 {
153  PGOutputData *data = palloc0(sizeof(PGOutputData));
154 
155  /* Create our memory context for private allocations. */
157  "logical replication output context",
159 
160  ctx->output_plugin_private = data;
161 
162  /* This plugin uses binary protocol. */
164 
165  /*
166  * This is replication start and not slot initialization.
167  *
168  * Parse and validate options passed by the client.
169  */
170  if (!is_init)
171  {
172  /* Parse the params and ERROR if we see any we don't recognize */
174  &data->protocol_version,
175  &data->publication_names);
176 
177  /* Check if we support requested protocol */
179  ereport(ERROR,
180  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
181  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
183 
185  ereport(ERROR,
186  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
187  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
189 
190  if (list_length(data->publication_names) < 1)
191  ereport(ERROR,
192  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
193  errmsg("publication_names parameter missing")));
194 
195  /* Init publication state. */
196  data->publications = NIL;
197  publications_valid = false;
200  (Datum) 0);
201 
202  /* Initialize relation schema cache. */
204  }
205 }
#define NIL
Definition: pg_list.h:69
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
static bool publications_valid
Definition: pgoutput.c:48
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:472
MemoryContext context
Definition: pgoutput.h:20
int errcode(int sqlerrcode)
Definition: elog.c:575
void * output_plugin_private
Definition: logical.h:78
MemoryContext context
Definition: logical.h:38
List * output_plugin_options
Definition: logical.h:61
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
List * publication_names
Definition: pgoutput.h:26
#define ereport(elevel, rest)
Definition: elog.h:122
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:170
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1389
void * palloc0(Size size)
Definition: mcxt.c:955
uintptr_t Datum
Definition: postgres.h:365
static int list_length(const List *l)
Definition: pg_list.h:89
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:90
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:491
int errmsg(const char *fmt,...)
Definition: elog.c:797
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
uint32 protocol_version
Definition: pgoutput.h:24

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 378 of file pgoutput.c.

References PGOutputData::context, ReorderBufferChange::data, get_rel_sync_entry(), i, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), RelationGetRelid, RelationSyncEntry::relid, and ReorderBufferChange::truncate.

Referenced by _PG_output_plugin_init().

380 {
382  MemoryContext old;
383  RelationSyncEntry *relentry;
384  int i;
385  int nrelids;
386  Oid *relids;
387 
388  old = MemoryContextSwitchTo(data->context);
389 
390  relids = palloc0(nrelations * sizeof(Oid));
391  nrelids = 0;
392 
393  for (i = 0; i < nrelations; i++)
394  {
395  Relation relation = relations[i];
396  Oid relid = RelationGetRelid(relation);
397 
398  if (!is_publishable_relation(relation))
399  continue;
400 
401  relentry = get_rel_sync_entry(data, relid);
402 
403  if (!relentry->pubactions.pubtruncate)
404  continue;
405 
406  relids[nrelids++] = relid;
407  maybe_send_schema(ctx, relation, relentry);
408  }
409 
410  OutputPluginPrepareWrite(ctx, true);
412  nrelids,
413  relids,
414  change->data.truncate.cascade,
415  change->data.truncate.restart_seqs);
416  OutputPluginWrite(ctx, true);
417 
420 }
struct ReorderBufferChange::@102::@104 truncate
MemoryContext context
Definition: pgoutput.h:20
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:261
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:78
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
unsigned int Oid
Definition: postgres_ext.h:31
bool is_publishable_relation(Relation rel)
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:302
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:523
void * palloc0(Size size)
Definition: mcxt.c:955
union ReorderBufferChange::@102 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:503
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:516
StringInfo out
Definition: logical.h:73
int i
#define RelationGetRelid(relation)
Definition: rel.h:407

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 472 of file pgoutput.c.

References publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

473 {
474  publications_valid = false;
475 
476  /*
477  * Also invalidate per-relation cache so that next time the filtering info
478  * is checked it will be updated with the new publication settings.
479  */
480  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
481 }
static bool publications_valid
Definition: pgoutput.c:48
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:635
void * arg

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 635 of file pgoutput.c.

References hash_seq_init(), hash_seq_search(), RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

636 {
638  RelationSyncEntry *entry;
639 
640  /*
641  * We can get here if the plugin was used in SQL interface as the
642  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
643  * is no way to unregister the relcache invalidation callback.
644  */
645  if (RelationSyncCache == NULL)
646  return;
647 
648  /*
649  * There is no way to find which entry in our cache the hash belongs to so
650  * mark the whole cache as invalid.
651  */
653  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
654  entry->replicate_valid = false;
655 }
bool replicate_valid
Definition: pgoutput.c:59
static HTAB * RelationSyncCache
Definition: pgoutput.c:64
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1389
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1379
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 597 of file pgoutput.c.

References HASH_FIND, hash_search(), and RelationSyncEntry::schema_sent.

Referenced by init_rel_sync_cache().

598 {
599  RelationSyncEntry *entry;
600 
601  /*
602  * We can get here if the plugin was used in SQL interface as the
603  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
604  * is no way to unregister the relcache invalidation callback.
605  */
606  if (RelationSyncCache == NULL)
607  return;
608 
609  /*
610  * Nobody keeps pointers to entries in this hash table around outside
611  * logical decoding callback calls - but invalidation events can come in
612  * *during* a callback if we access the relcache in the callback. Because
613  * of that we must mark the cache entry as invalid but not remove it from
614  * the hash while it could still be referenced, then prune it at a later
615  * safe point.
616  *
617  * Getting invalidations for relations that aren't in the table is
618  * entirely normal, since there's no way to unregister for an invalidation
619  * event. So we don't care if it's found or not.
620  */
621  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
622  HASH_FIND, NULL);
623 
624  /*
625  * Reset schema sent status as the relation definition may have changed.
626  */
627  if (entry != NULL)
628  entry->schema_sent = false;
629 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:906
static HTAB * RelationSyncCache
Definition: pgoutput.c:64

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 28 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 48 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache

HTAB* RelationSyncCache = NULL
static

Definition at line 64 of file pgoutput.c.