PostgreSQL Source Code  git master
pgoutput.c File Reference
#include "postgres.h"
#include "catalog/pg_publication.h"
#include "fmgr.h"
#include "replication/logical.h"
#include "replication/logicalproto.h"
#include "replication/origin.h"
#include "replication/pgoutput.h"
#include "utils/int8.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for pgoutput.c:

Go to the source code of this file.

Data Structures

struct  RelationSyncEntry
 

Typedefs

typedef struct RelationSyncEntry RelationSyncEntry
 

Functions

void _PG_output_plugin_init (OutputPluginCallbacks *cb)
 
static void pgoutput_startup (LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
 
static void pgoutput_shutdown (LogicalDecodingContext *ctx)
 
static void pgoutput_begin_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 
static void pgoutput_commit_txn (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
 
static void pgoutput_change (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
 
static void pgoutput_truncate (LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
 
static bool pgoutput_origin_filter (LogicalDecodingContext *ctx, RepOriginId origin_id)
 
static ListLoadPublications (List *pubnames)
 
static void publication_invalidation_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void init_rel_sync_cache (MemoryContext decoding_context)
 
static RelationSyncEntryget_rel_sync_entry (PGOutputData *data, Oid relid)
 
static void rel_sync_cache_relation_cb (Datum arg, Oid relid)
 
static void rel_sync_cache_publication_cb (Datum arg, int cacheid, uint32 hashvalue)
 
static void parse_output_parameters (List *options, uint32 *protocol_version, List **publication_names)
 
static void maybe_send_schema (LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
 

Variables

 PG_MODULE_MAGIC
 
static bool publications_valid
 
static HTABRelationSyncCache = NULL
 

Typedef Documentation

◆ RelationSyncEntry

Function Documentation

◆ _PG_output_plugin_init()

void _PG_output_plugin_init ( OutputPluginCallbacks cb)

Definition at line 80 of file pgoutput.c.

References AssertVariableIsOfType, OutputPluginCallbacks::begin_cb, OutputPluginCallbacks::change_cb, OutputPluginCallbacks::commit_cb, OutputPluginCallbacks::filter_by_origin_cb, pgoutput_begin_txn(), pgoutput_change(), pgoutput_commit_txn(), pgoutput_origin_filter(), pgoutput_shutdown(), pgoutput_startup(), pgoutput_truncate(), OutputPluginCallbacks::shutdown_cb, OutputPluginCallbacks::startup_cb, and OutputPluginCallbacks::truncate_cb.

81 {
83 
91 }
LogicalDecodeTruncateCB truncate_cb
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change)
Definition: pgoutput.c:309
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:80
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:457
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:251
LogicalDecodeCommitCB commit_cb
void(* LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb)
Definition: output_plugin.h:36
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:154
LogicalDecodeChangeCB change_cb
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:444
LogicalDecodeShutdownCB shutdown_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeBeginCB begin_cb
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:386
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:215
LogicalDecodeFilterByOriginCB filter_by_origin_cb
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:897

◆ get_rel_sync_entry()

static RelationSyncEntry * get_rel_sync_entry ( PGOutputData data,
Oid  relid 
)
static

Definition at line 546 of file pgoutput.c.

References Publication::alltables, Assert, CacheMemoryContext, GetRelationPublications(), HASH_ENTER, hash_search(), lfirst, list_free(), list_free_deep(), list_member_oid(), LoadPublications(), MemoryContextSwitchTo(), Publication::oid, RelationSyncEntry::pubactions, Publication::pubactions, PublicationActions::pubdelete, PublicationActions::pubinsert, PGOutputData::publication_names, PGOutputData::publications, publications_valid, PublicationActions::pubtruncate, PublicationActions::pubupdate, RelationSyncEntry::relid, RelationSyncEntry::replicate_valid, and RelationSyncEntry::schema_sent.

Referenced by pgoutput_change(), and pgoutput_truncate().

547 {
548  RelationSyncEntry *entry;
549  bool found;
550  MemoryContext oldctx;
551 
552  Assert(RelationSyncCache != NULL);
553 
554  /* Find cached function info, creating if not found */
557  (void *) &relid,
558  HASH_ENTER, &found);
559  MemoryContextSwitchTo(oldctx);
560  Assert(entry != NULL);
561 
562  /* Not found means schema wasn't sent */
563  if (!found || !entry->replicate_valid)
564  {
565  List *pubids = GetRelationPublications(relid);
566  ListCell *lc;
567 
568  /* Reload publications if needed before use. */
569  if (!publications_valid)
570  {
572  if (data->publications)
574 
576  MemoryContextSwitchTo(oldctx);
577  publications_valid = true;
578  }
579 
580  /*
581  * Build publication cache. We can't use one provided by relcache as
582  * relcache considers all publications given relation is in, but here
583  * we only need to consider ones that the subscriber requested.
584  */
585  entry->pubactions.pubinsert = entry->pubactions.pubupdate =
586  entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
587 
588  foreach(lc, data->publications)
589  {
590  Publication *pub = lfirst(lc);
591 
592  if (pub->alltables || list_member_oid(pubids, pub->oid))
593  {
594  entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
595  entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
596  entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
598  }
599 
600  if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
601  entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
602  break;
603  }
604 
605  list_free(pubids);
606 
607  entry->replicate_valid = true;
608  }
609 
610  if (!found)
611  entry->schema_sent = false;
612 
613  return entry;
614 }
PublicationActions pubactions
static bool publications_valid
Definition: pgoutput.c:47
bool replicate_valid
Definition: pgoutput.c:63
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:907
void list_free_deep(List *list)
Definition: list.c:1391
List * GetRelationPublications(Oid relid)
List * publication_names
Definition: pgoutput.h:26
static HTAB * RelationSyncCache
Definition: pgoutput.c:68
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:675
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:470
List * publications
Definition: pgoutput.h:27
void list_free(List *list)
Definition: list.c:1377
Definition: pg_list.h:50
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
PublicationActions pubactions
Definition: pgoutput.c:64

◆ init_rel_sync_cache()

static void init_rel_sync_cache ( MemoryContext  decoding_context)
static

Definition at line 509 of file pgoutput.c.

References Assert, CacheRegisterRelcacheCallback(), CacheRegisterSyscacheCallback(), HASHCTL::entrysize, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASHCTL::hcxt, HASHCTL::keysize, MemoryContextSwitchTo(), MemSet, PUBLICATIONRELMAP, rel_sync_cache_publication_cb(), and rel_sync_cache_relation_cb().

Referenced by pgoutput_startup().

510 {
511  HASHCTL ctl;
512  MemoryContext old_ctxt;
513 
514  if (RelationSyncCache != NULL)
515  return;
516 
517  /* Make a new hash table for the cache */
518  MemSet(&ctl, 0, sizeof(ctl));
519  ctl.keysize = sizeof(Oid);
520  ctl.entrysize = sizeof(RelationSyncEntry);
521  ctl.hcxt = cachectx;
522 
523  old_ctxt = MemoryContextSwitchTo(cachectx);
524  RelationSyncCache = hash_create("logical replication output relation cache",
525  128, &ctl,
527  (void) MemoryContextSwitchTo(old_ctxt);
528 
529  Assert(RelationSyncCache != NULL);
530 
534  (Datum) 0);
535 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:620
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:971
unsigned int Oid
Definition: postgres_ext.h:31
struct RelationSyncEntry RelationSyncEntry
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:658
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1468
static HTAB * RelationSyncCache
Definition: pgoutput.c:68
#define HASH_BLOBS
Definition: hsearch.h:88
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
uintptr_t Datum
Definition: postgres.h:367
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:317
Size keysize
Definition: hsearch.h:72
#define Assert(condition)
Definition: c.h:738

◆ LoadPublications()

static List * LoadPublications ( List pubnames)
static

Definition at line 470 of file pgoutput.c.

References GetPublicationByName(), lappend(), lfirst, and NIL.

Referenced by get_rel_sync_entry().

471 {
472  List *result = NIL;
473  ListCell *lc;
474 
475  foreach(lc, pubnames)
476  {
477  char *pubname = (char *) lfirst(lc);
478  Publication *pub = GetPublicationByName(pubname, false);
479 
480  result = lappend(result, pub);
481  }
482 
483  return result;
484 }
#define NIL
Definition: pg_list.h:65
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * lappend(List *list, void *datum)
Definition: list.c:322
#define lfirst(lc)
Definition: pg_list.h:190
Definition: pg_list.h:50

◆ maybe_send_schema()

static void maybe_send_schema ( LogicalDecodingContext ctx,
Relation  relation,
RelationSyncEntry relentry 
)
static

Definition at line 265 of file pgoutput.c.

References FirstGenbkiObjectId, i, logicalrep_write_rel(), logicalrep_write_typ(), TupleDescData::natts, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetDescr, RelationSyncEntry::schema_sent, and TupleDescAttr.

Referenced by pgoutput_change(), and pgoutput_truncate().

267 {
268  if (!relentry->schema_sent)
269  {
270  TupleDesc desc;
271  int i;
272 
273  desc = RelationGetDescr(relation);
274 
275  /*
276  * Write out type info if needed. We do that only for user-created
277  * types. We use FirstGenbkiObjectId as the cutoff, so that we only
278  * consider objects with hand-assigned OIDs to be "built in", not for
279  * instance any function or type defined in the information_schema.
280  * This is important because only hand-assigned OIDs can be expected
281  * to remain stable across major versions.
282  */
283  for (i = 0; i < desc->natts; i++)
284  {
285  Form_pg_attribute att = TupleDescAttr(desc, i);
286 
287  if (att->attisdropped || att->attgenerated)
288  continue;
289 
290  if (att->atttypid < FirstGenbkiObjectId)
291  continue;
292 
293  OutputPluginPrepareWrite(ctx, false);
294  logicalrep_write_typ(ctx->out, att->atttypid);
295  OutputPluginWrite(ctx, false);
296  }
297 
298  OutputPluginPrepareWrite(ctx, false);
299  logicalrep_write_rel(ctx->out, relation);
300  OutputPluginWrite(ctx, false);
301  relentry->schema_sent = true;
302  }
303 }
void logicalrep_write_typ(StringInfo out, Oid typoid)
Definition: proto.c:400
#define RelationGetDescr(relation)
Definition: rel.h:461
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:193
void logicalrep_write_rel(StringInfo out, Relation rel)
Definition: proto.c:350
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:522
#define FirstGenbkiObjectId
Definition: transam.h:139
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:535
StringInfo out
Definition: logical.h:70
int i

◆ parse_output_parameters()

static void parse_output_parameters ( List options,
uint32 protocol_version,
List **  publication_names 
)
static

Definition at line 94 of file pgoutput.c.

References DefElem::arg, Assert, DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, IsA, lfirst, PG_UINT32_MAX, scanint8(), SplitIdentifierString(), and strVal.

Referenced by pgoutput_startup().

96 {
97  ListCell *lc;
98  bool protocol_version_given = false;
99  bool publication_names_given = false;
100 
101  foreach(lc, options)
102  {
103  DefElem *defel = (DefElem *) lfirst(lc);
104 
105  Assert(defel->arg == NULL || IsA(defel->arg, String));
106 
107  /* Check each param, whether or not we recognize it */
108  if (strcmp(defel->defname, "proto_version") == 0)
109  {
110  int64 parsed;
111 
112  if (protocol_version_given)
113  ereport(ERROR,
114  (errcode(ERRCODE_SYNTAX_ERROR),
115  errmsg("conflicting or redundant options")));
116  protocol_version_given = true;
117 
118  if (!scanint8(strVal(defel->arg), true, &parsed))
119  ereport(ERROR,
120  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
121  errmsg("invalid proto_version")));
122 
123  if (parsed > PG_UINT32_MAX || parsed < 0)
124  ereport(ERROR,
125  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
126  errmsg("proto_version \"%s\" out of range",
127  strVal(defel->arg))));
128 
129  *protocol_version = (uint32) parsed;
130  }
131  else if (strcmp(defel->defname, "publication_names") == 0)
132  {
133  if (publication_names_given)
134  ereport(ERROR,
135  (errcode(ERRCODE_SYNTAX_ERROR),
136  errmsg("conflicting or redundant options")));
137  publication_names_given = true;
138 
139  if (!SplitIdentifierString(strVal(defel->arg), ',',
140  publication_names))
141  ereport(ERROR,
142  (errcode(ERRCODE_INVALID_NAME),
143  errmsg("invalid publication_names syntax")));
144  }
145  else
146  elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
147  }
148 }
#define IsA(nodeptr, _type_)
Definition: nodes.h:577
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:610
#define PG_UINT32_MAX
Definition: c.h:451
#define ERROR
Definition: elog.h:43
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3672
unsigned int uint32
Definition: c.h:367
Node * arg
Definition: parsenodes.h:731
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
char * defname
Definition: parsenodes.h:730
bool scanint8(const char *str, bool errorOK, int64 *result)
Definition: int8.c:55

◆ pgoutput_begin_txn()

static void pgoutput_begin_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn 
)
static

Definition at line 215 of file pgoutput.c.

References InvalidRepOriginId, logicalrep_write_begin(), logicalrep_write_origin(), ReorderBufferTXN::origin_id, ReorderBufferTXN::origin_lsn, LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginWrite(), and replorigin_by_oid().

Referenced by _PG_output_plugin_init().

216 {
217  bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
218 
219  OutputPluginPrepareWrite(ctx, !send_replication_origin);
220  logicalrep_write_begin(ctx->out, txn);
221 
222  if (send_replication_origin)
223  {
224  char *origin;
225 
226  /* Message boundary */
227  OutputPluginWrite(ctx, false);
228  OutputPluginPrepareWrite(ctx, true);
229 
230  /*----------
231  * XXX: which behaviour do we want here?
232  *
233  * Alternatives:
234  * - don't send origin message if origin name not found
235  * (that's what we do now)
236  * - throw error - that will break replication, not good
237  * - send some special "unknown" origin
238  *----------
239  */
240  if (replorigin_by_oid(txn->origin_id, true, &origin))
241  logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
242  }
243 
244  OutputPluginWrite(ctx, true);
245 }
RepOriginId origin_id
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:113
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:431
XLogRecPtr origin_lsn
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:46
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:522
#define InvalidRepOriginId
Definition: origin.h:33
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:535
StringInfo out
Definition: logical.h:70

◆ pgoutput_change()

static void pgoutput_change ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
Relation  rel,
ReorderBufferChange change 
)
static

Definition at line 309 of file pgoutput.c.

References ReorderBufferChange::action, Assert, PGOutputData::context, ReorderBufferChange::data, DEBUG1, elog, get_rel_sync_entry(), is_publishable_relation(), logicalrep_write_delete(), logicalrep_write_insert(), logicalrep_write_update(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), RelationGetRelid, REORDER_BUFFER_CHANGE_DELETE, REORDER_BUFFER_CHANGE_INSERT, REORDER_BUFFER_CHANGE_UPDATE, and ReorderBufferChange::tp.

Referenced by _PG_output_plugin_init().

311 {
313  MemoryContext old;
314  RelationSyncEntry *relentry;
315 
316  if (!is_publishable_relation(relation))
317  return;
318 
319  relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
320 
321  /* First check the table filter */
322  switch (change->action)
323  {
325  if (!relentry->pubactions.pubinsert)
326  return;
327  break;
329  if (!relentry->pubactions.pubupdate)
330  return;
331  break;
333  if (!relentry->pubactions.pubdelete)
334  return;
335  break;
336  default:
337  Assert(false);
338  }
339 
340  /* Avoid leaking memory by using and resetting our own context */
341  old = MemoryContextSwitchTo(data->context);
342 
343  maybe_send_schema(ctx, relation, relentry);
344 
345  /* Send the data */
346  switch (change->action)
347  {
349  OutputPluginPrepareWrite(ctx, true);
350  logicalrep_write_insert(ctx->out, relation,
351  &change->data.tp.newtuple->tuple);
352  OutputPluginWrite(ctx, true);
353  break;
355  {
356  HeapTuple oldtuple = change->data.tp.oldtuple ?
357  &change->data.tp.oldtuple->tuple : NULL;
358 
359  OutputPluginPrepareWrite(ctx, true);
360  logicalrep_write_update(ctx->out, relation, oldtuple,
361  &change->data.tp.newtuple->tuple);
362  OutputPluginWrite(ctx, true);
363  break;
364  }
366  if (change->data.tp.oldtuple)
367  {
368  OutputPluginPrepareWrite(ctx, true);
369  logicalrep_write_delete(ctx->out, relation,
370  &change->data.tp.oldtuple->tuple);
371  OutputPluginWrite(ctx, true);
372  }
373  else
374  elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
375  break;
376  default:
377  Assert(false);
378  }
379 
380  /* Cleanup */
383 }
#define DEBUG1
Definition: elog.h:25
MemoryContext context
Definition: pgoutput.h:20
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:265
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
enum ReorderBufferChangeType action
Definition: reorderbuffer.h:83
bool is_publishable_relation(Relation rel)
void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple)
Definition: proto.c:251
void logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple)
Definition: proto.c:142
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:546
union ReorderBufferChange::@99 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:522
void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
Definition: proto.c:181
struct ReorderBufferChange::@99::@100 tp
#define Assert(condition)
Definition: c.h:738
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:535
#define elog(elevel,...)
Definition: elog.h:214
StringInfo out
Definition: logical.h:70
#define RelationGetRelid(relation)
Definition: rel.h:435

◆ pgoutput_commit_txn()

static void pgoutput_commit_txn ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
XLogRecPtr  commit_lsn 
)
static

Definition at line 251 of file pgoutput.c.

References logicalrep_write_commit(), LogicalDecodingContext::out, OutputPluginPrepareWrite(), OutputPluginUpdateProgress(), and OutputPluginWrite().

Referenced by _PG_output_plugin_init().

253 {
255 
256  OutputPluginPrepareWrite(ctx, true);
257  logicalrep_write_commit(ctx->out, txn, commit_lsn);
258  OutputPluginWrite(ctx, true);
259 }
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:75
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
Definition: logical.c:548
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:522
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:535
StringInfo out
Definition: logical.h:70

◆ pgoutput_origin_filter()

static bool pgoutput_origin_filter ( LogicalDecodingContext ctx,
RepOriginId  origin_id 
)
static

Definition at line 444 of file pgoutput.c.

Referenced by _PG_output_plugin_init().

446 {
447  return false;
448 }

◆ pgoutput_shutdown()

static void pgoutput_shutdown ( LogicalDecodingContext ctx)
static

Definition at line 457 of file pgoutput.c.

References hash_destroy().

Referenced by _PG_output_plugin_init().

458 {
459  if (RelationSyncCache)
460  {
462  RelationSyncCache = NULL;
463  }
464 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:815
static HTAB * RelationSyncCache
Definition: pgoutput.c:68

◆ pgoutput_startup()

static void pgoutput_startup ( LogicalDecodingContext ctx,
OutputPluginOptions opt,
bool  is_init 
)
static

Definition at line 154 of file pgoutput.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, CacheMemoryContext, CacheRegisterSyscacheCallback(), PGOutputData::context, LogicalDecodingContext::context, ereport, errcode(), errmsg(), ERROR, init_rel_sync_cache(), list_length(), LOGICALREP_PROTO_MIN_VERSION_NUM, LOGICALREP_PROTO_VERSION_NUM, NIL, OUTPUT_PLUGIN_BINARY_OUTPUT, LogicalDecodingContext::output_plugin_options, LogicalDecodingContext::output_plugin_private, OutputPluginOptions::output_type, palloc0(), parse_output_parameters(), PGOutputData::protocol_version, publication_invalidation_cb(), PGOutputData::publication_names, PUBLICATIONOID, PGOutputData::publications, and publications_valid.

Referenced by _PG_output_plugin_init().

156 {
157  PGOutputData *data = palloc0(sizeof(PGOutputData));
158 
159  /* Create our memory context for private allocations. */
161  "logical replication output context",
163 
164  ctx->output_plugin_private = data;
165 
166  /* This plugin uses binary protocol. */
168 
169  /*
170  * This is replication start and not slot initialization.
171  *
172  * Parse and validate options passed by the client.
173  */
174  if (!is_init)
175  {
176  /* Parse the params and ERROR if we see any we don't recognize */
178  &data->protocol_version,
179  &data->publication_names);
180 
181  /* Check if we support requested protocol */
183  ereport(ERROR,
184  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
185  errmsg("client sent proto_version=%d but we only support protocol %d or lower",
187 
189  ereport(ERROR,
190  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
191  errmsg("client sent proto_version=%d but we only support protocol %d or higher",
193 
194  if (list_length(data->publication_names) < 1)
195  ereport(ERROR,
196  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
197  errmsg("publication_names parameter missing")));
198 
199  /* Init publication state. */
200  data->publications = NIL;
201  publications_valid = false;
204  (Datum) 0);
205 
206  /* Initialize relation schema cache. */
208  }
209 }
#define NIL
Definition: pg_list.h:65
#define LOGICALREP_PROTO_VERSION_NUM
Definition: logicalproto.h:28
#define AllocSetContextCreate
Definition: memutils.h:170
static bool publications_valid
Definition: pgoutput.c:47
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:490
MemoryContext context
Definition: pgoutput.h:20
int errcode(int sqlerrcode)
Definition: elog.c:610
void * output_plugin_private
Definition: logical.h:75
MemoryContext context
Definition: logical.h:35
List * output_plugin_options
Definition: logical.h:58
OutputPluginOutputType output_type
Definition: output_plugin.h:28
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:27
List * publication_names
Definition: pgoutput.h:26
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1426
void * palloc0(Size size)
Definition: mcxt.c:980
uintptr_t Datum
Definition: postgres.h:367
#define ereport(elevel,...)
Definition: elog.h:144
static int list_length(const List *l)
Definition: pg_list.h:169
static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names)
Definition: pgoutput.c:94
List * publications
Definition: pgoutput.h:27
static void init_rel_sync_cache(MemoryContext decoding_context)
Definition: pgoutput.c:509
int errmsg(const char *fmt,...)
Definition: elog.c:824
MemoryContext CacheMemoryContext
Definition: mcxt.c:47
uint32 protocol_version
Definition: pgoutput.h:24

◆ pgoutput_truncate()

static void pgoutput_truncate ( LogicalDecodingContext ctx,
ReorderBufferTXN txn,
int  nrelations,
Relation  relations[],
ReorderBufferChange change 
)
static

Definition at line 386 of file pgoutput.c.

References PGOutputData::context, ReorderBufferChange::data, get_rel_sync_entry(), i, is_publishable_relation(), logicalrep_write_truncate(), maybe_send_schema(), MemoryContextReset(), MemoryContextSwitchTo(), LogicalDecodingContext::out, LogicalDecodingContext::output_plugin_private, OutputPluginPrepareWrite(), OutputPluginWrite(), palloc0(), RelationData::rd_rel, RelationGetRelid, RelationSyncEntry::relid, and ReorderBufferChange::truncate.

Referenced by _PG_output_plugin_init().

388 {
390  MemoryContext old;
391  RelationSyncEntry *relentry;
392  int i;
393  int nrelids;
394  Oid *relids;
395 
396  old = MemoryContextSwitchTo(data->context);
397 
398  relids = palloc0(nrelations * sizeof(Oid));
399  nrelids = 0;
400 
401  for (i = 0; i < nrelations; i++)
402  {
403  Relation relation = relations[i];
404  Oid relid = RelationGetRelid(relation);
405 
406  if (!is_publishable_relation(relation))
407  continue;
408 
409  relentry = get_rel_sync_entry(data, relid);
410 
411  if (!relentry->pubactions.pubtruncate)
412  continue;
413 
414  /*
415  * Don't send partitioned tables, because partitions should be sent
416  * instead.
417  */
418  if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
419  continue;
420 
421  relids[nrelids++] = relid;
422  maybe_send_schema(ctx, relation, relentry);
423  }
424 
425  if (nrelids > 0)
426  {
427  OutputPluginPrepareWrite(ctx, true);
429  nrelids,
430  relids,
431  change->data.truncate.cascade,
432  change->data.truncate.restart_seqs);
433  OutputPluginWrite(ctx, true);
434  }
435 
438 }
struct ReorderBufferChange::@99::@101 truncate
MemoryContext context
Definition: pgoutput.h:20
static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:265
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * output_plugin_private
Definition: logical.h:75
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:136
Form_pg_class rd_rel
Definition: rel.h:89
unsigned int Oid
Definition: postgres_ext.h:31
bool is_publishable_relation(Relation rel)
void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:298
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid)
Definition: pgoutput.c:546
void * palloc0(Size size)
Definition: mcxt.c:980
union ReorderBufferChange::@99 data
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:522
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:535
StringInfo out
Definition: logical.h:70
int i
#define RelationGetRelid(relation)
Definition: rel.h:435

◆ publication_invalidation_cb()

static void publication_invalidation_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 490 of file pgoutput.c.

References publications_valid, and rel_sync_cache_publication_cb().

Referenced by pgoutput_startup().

491 {
492  publications_valid = false;
493 
494  /*
495  * Also invalidate per-relation cache so that next time the filtering info
496  * is checked it will be updated with the new publication settings.
497  */
498  rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
499 }
static bool publications_valid
Definition: pgoutput.c:47
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:658
void * arg

◆ rel_sync_cache_publication_cb()

static void rel_sync_cache_publication_cb ( Datum  arg,
int  cacheid,
uint32  hashvalue 
)
static

Definition at line 658 of file pgoutput.c.

References hash_seq_init(), hash_seq_search(), RelationSyncEntry::replicate_valid, and status().

Referenced by init_rel_sync_cache(), and publication_invalidation_cb().

659 {
661  RelationSyncEntry *entry;
662 
663  /*
664  * We can get here if the plugin was used in SQL interface as the
665  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
666  * is no way to unregister the relcache invalidation callback.
667  */
668  if (RelationSyncCache == NULL)
669  return;
670 
671  /*
672  * There is no way to find which entry in our cache the hash belongs to so
673  * mark the whole cache as invalid.
674  */
676  while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
677  entry->replicate_valid = false;
678 }
bool replicate_valid
Definition: pgoutput.c:63
static HTAB * RelationSyncCache
Definition: pgoutput.c:68
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1390
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1380
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225

◆ rel_sync_cache_relation_cb()

static void rel_sync_cache_relation_cb ( Datum  arg,
Oid  relid 
)
static

Definition at line 620 of file pgoutput.c.

References HASH_FIND, hash_search(), and RelationSyncEntry::schema_sent.

Referenced by init_rel_sync_cache().

621 {
622  RelationSyncEntry *entry;
623 
624  /*
625  * We can get here if the plugin was used in SQL interface as the
626  * RelSchemaSyncCache is destroyed when the decoding finishes, but there
627  * is no way to unregister the relcache invalidation callback.
628  */
629  if (RelationSyncCache == NULL)
630  return;
631 
632  /*
633  * Nobody keeps pointers to entries in this hash table around outside
634  * logical decoding callback calls - but invalidation events can come in
635  * *during* a callback if we access the relcache in the callback. Because
636  * of that we must mark the cache entry as invalid but not remove it from
637  * the hash while it could still be referenced, then prune it at a later
638  * safe point.
639  *
640  * Getting invalidations for relations that aren't in the table is
641  * entirely normal, since there's no way to unregister for an invalidation
642  * event. So we don't care if it's found or not.
643  */
644  entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
645  HASH_FIND, NULL);
646 
647  /*
648  * Reset schema sent status as the relation definition may have changed.
649  */
650  if (entry != NULL)
651  entry->schema_sent = false;
652 }
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:907
static HTAB * RelationSyncCache
Definition: pgoutput.c:68

Variable Documentation

◆ PG_MODULE_MAGIC

PG_MODULE_MAGIC

Definition at line 27 of file pgoutput.c.

◆ publications_valid

bool publications_valid
static

Definition at line 47 of file pgoutput.c.

Referenced by get_rel_sync_entry(), pgoutput_startup(), and publication_invalidation_cb().

◆ RelationSyncCache

HTAB* RelationSyncCache = NULL
static

Definition at line 68 of file pgoutput.c.