PostgreSQL Source Code git master
pgoutput.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pgoutput.c
4 * Logical Replication output plugin
5 *
6 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/pgoutput/pgoutput.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "access/tupconvert.h"
16#include "catalog/partition.h"
20#include "commands/defrem.h"
22#include "executor/executor.h"
23#include "fmgr.h"
24#include "nodes/makefuncs.h"
26#include "replication/logical.h"
28#include "replication/origin.h"
31#include "utils/builtins.h"
32#include "utils/inval.h"
33#include "utils/lsyscache.h"
34#include "utils/memutils.h"
35#include "utils/rel.h"
36#include "utils/syscache.h"
37#include "utils/varlena.h"
38
40
42 OutputPluginOptions *opt, bool is_init);
45 ReorderBufferTXN *txn);
47 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
49 ReorderBufferTXN *txn, Relation relation,
50 ReorderBufferChange *change);
52 ReorderBufferTXN *txn, int nrelations, Relation relations[],
53 ReorderBufferChange *change);
55 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
56 bool transactional, const char *prefix,
57 Size sz, const char *message);
59 RepOriginId origin_id);
61 ReorderBufferTXN *txn);
63 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
65 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
68 XLogRecPtr prepare_end_lsn,
69 TimestampTz prepare_time);
70static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
71 ReorderBufferTXN *txn);
72static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
73 ReorderBufferTXN *txn);
74static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
76 XLogRecPtr abort_lsn);
79 XLogRecPtr commit_lsn);
81 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
82
84
85static List *LoadPublications(List *pubnames);
86static void publication_invalidation_cb(Datum arg, int cacheid,
87 uint32 hashvalue);
89 RepOriginId origin_id, XLogRecPtr origin_lsn,
90 bool send_origin);
91
92/*
93 * Only 3 publication actions are used for row filtering ("insert", "update",
94 * "delete"). See RelationSyncEntry.exprstate[].
95 */
97{
101};
102
103#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
104
105/*
106 * Entry in the map used to remember which relation schemas we sent.
107 *
108 * The schema_sent flag determines if the current schema record for the
109 * relation (and for its ancestor if publish_as_relid is set) was already
110 * sent to the subscriber (in which case we don't need to send it again).
111 *
112 * The schema cache on downstream is however updated only at commit time,
113 * and with streamed transactions the commit order may be different from
114 * the order the transactions are sent in. Also, the (sub) transactions
115 * might get aborted so we need to send the schema for each (sub) transaction
116 * so that we don't lose the schema information on abort. For handling this,
117 * we maintain the list of xids (streamed_txns) for those we have already sent
118 * the schema.
119 *
120 * For partitions, 'pubactions' considers not only the table's own
121 * publications, but also those of all of its ancestors.
122 */
123typedef struct RelationSyncEntry
124{
125 Oid relid; /* relation oid */
126
127 bool replicate_valid; /* overall validity flag for entry */
128
130
131 /*
132 * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
133 * columns and the 'publish_generated_columns' parameter is set to
134 * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
135 * indicating that no generated columns should be published, unless
136 * explicitly specified in the column list.
137 */
138 PublishGencolsType include_gencols_type;
139 List *streamed_txns; /* streamed toplevel transactions with this
140 * schema */
141
142 /* are we publishing this rel? */
144
145 /*
146 * ExprState array for row filter. Different publication actions don't
147 * allow multiple expressions to always be combined into one, because
148 * updates or deletes restrict the column in expression to be part of the
149 * replica identity index whereas inserts do not have this restriction, so
150 * there is one ExprState per publication action.
151 */
153 EState *estate; /* executor state used for row filter */
154 TupleTableSlot *new_slot; /* slot for storing new tuple */
155 TupleTableSlot *old_slot; /* slot for storing old tuple */
156
157 /*
158 * OID of the relation to publish changes as. For a partition, this may
159 * be set to one of its ancestors whose schema will be used when
160 * replicating changes, if publish_via_partition_root is set for the
161 * publication.
162 */
164
165 /*
166 * Map used when replicating using an ancestor's schema to convert tuples
167 * from partition's type to the ancestor's; NULL if publish_as_relid is
168 * same as 'relid' or if unnecessary due to partition and the ancestor
169 * having identical TupleDesc.
170 */
172
173 /*
174 * Columns included in the publication, or NULL if all columns are
175 * included implicitly. Note that the attnums in this bitmap are not
176 * shifted by FirstLowInvalidHeapAttributeNumber.
177 */
179
180 /*
181 * Private context to store additional data for this entry - state for the
182 * row filter expressions, column list, etc.
183 */
186
187/*
188 * Maintain a per-transaction level variable to track whether the transaction
189 * has sent BEGIN. BEGIN is only sent when the first change in a transaction
190 * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
191 * messages for empty transactions which saves network bandwidth.
192 *
193 * This optimization is not used for prepared transactions because if the
194 * WALSender restarts after prepare of a transaction and before commit prepared
195 * of the same transaction then we won't be able to figure out if we have
196 * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
197 * because we would have lost the in-memory txndata information that was
198 * present prior to the restart. This will result in sending a spurious
199 * COMMIT PREPARED without a corresponding prepared transaction at the
200 * downstream which would lead to an error when it tries to process it.
201 *
202 * XXX We could achieve this optimization by changing protocol to send
203 * additional information so that downstream can detect that the corresponding
204 * prepare has not been sent. However, adding such a check for every
205 * transaction in the downstream could be costly so we might want to do it
206 * optionally.
207 *
208 * We also don't have this optimization for streamed transactions because
209 * they can contain prepared transactions.
210 */
211typedef struct PGOutputTxnData
212{
213 bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
215
216/* Map used to remember which relation schemas we sent. */
217static HTAB *RelationSyncCache = NULL;
218
219static void init_rel_sync_cache(MemoryContext cachectx);
220static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
222 Relation relation);
223static void send_relation_and_attrs(Relation relation, TransactionId xid,
225 RelationSyncEntry *relentry);
226static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
227static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
228 uint32 hashvalue);
230 TransactionId xid);
232 TransactionId xid);
233static void init_tuple_slot(PGOutputData *data, Relation relation,
234 RelationSyncEntry *entry);
235
236/* row filter routines */
239 List *publications,
240 RelationSyncEntry *entry);
242 ExprContext *econtext);
243static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
244 TupleTableSlot **new_slot_ptr,
245 RelationSyncEntry *entry,
247
248/* column list routines */
250 List *publications,
251 RelationSyncEntry *entry);
252
253/*
254 * Specify output plugin callbacks
255 */
256void
258{
265
272
273 /* transaction streaming */
281 /* transaction streaming - two-phase commit */
283}
284
285static void
287{
288 ListCell *lc;
289 bool protocol_version_given = false;
290 bool publication_names_given = false;
291 bool binary_option_given = false;
292 bool messages_option_given = false;
293 bool streaming_given = false;
294 bool two_phase_option_given = false;
295 bool origin_option_given = false;
296
297 data->binary = false;
298 data->streaming = LOGICALREP_STREAM_OFF;
299 data->messages = false;
300 data->two_phase = false;
301
302 foreach(lc, options)
303 {
304 DefElem *defel = (DefElem *) lfirst(lc);
305
306 Assert(defel->arg == NULL || IsA(defel->arg, String));
307
308 /* Check each param, whether or not we recognize it */
309 if (strcmp(defel->defname, "proto_version") == 0)
310 {
311 unsigned long parsed;
312 char *endptr;
313
314 if (protocol_version_given)
316 (errcode(ERRCODE_SYNTAX_ERROR),
317 errmsg("conflicting or redundant options")));
318 protocol_version_given = true;
319
320 errno = 0;
321 parsed = strtoul(strVal(defel->arg), &endptr, 10);
322 if (errno != 0 || *endptr != '\0')
324 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
325 errmsg("invalid proto_version")));
326
327 if (parsed > PG_UINT32_MAX)
329 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
330 errmsg("proto_version \"%s\" out of range",
331 strVal(defel->arg))));
332
333 data->protocol_version = (uint32) parsed;
334 }
335 else if (strcmp(defel->defname, "publication_names") == 0)
336 {
337 if (publication_names_given)
339 (errcode(ERRCODE_SYNTAX_ERROR),
340 errmsg("conflicting or redundant options")));
341 publication_names_given = true;
342
343 if (!SplitIdentifierString(strVal(defel->arg), ',',
344 &data->publication_names))
346 (errcode(ERRCODE_INVALID_NAME),
347 errmsg("invalid publication_names syntax")));
348 }
349 else if (strcmp(defel->defname, "binary") == 0)
350 {
351 if (binary_option_given)
353 (errcode(ERRCODE_SYNTAX_ERROR),
354 errmsg("conflicting or redundant options")));
355 binary_option_given = true;
356
357 data->binary = defGetBoolean(defel);
358 }
359 else if (strcmp(defel->defname, "messages") == 0)
360 {
361 if (messages_option_given)
363 (errcode(ERRCODE_SYNTAX_ERROR),
364 errmsg("conflicting or redundant options")));
365 messages_option_given = true;
366
367 data->messages = defGetBoolean(defel);
368 }
369 else if (strcmp(defel->defname, "streaming") == 0)
370 {
371 if (streaming_given)
373 (errcode(ERRCODE_SYNTAX_ERROR),
374 errmsg("conflicting or redundant options")));
375 streaming_given = true;
376
377 data->streaming = defGetStreamingMode(defel);
378 }
379 else if (strcmp(defel->defname, "two_phase") == 0)
380 {
381 if (two_phase_option_given)
383 (errcode(ERRCODE_SYNTAX_ERROR),
384 errmsg("conflicting or redundant options")));
385 two_phase_option_given = true;
386
387 data->two_phase = defGetBoolean(defel);
388 }
389 else if (strcmp(defel->defname, "origin") == 0)
390 {
391 char *origin;
392
393 if (origin_option_given)
395 errcode(ERRCODE_SYNTAX_ERROR),
396 errmsg("conflicting or redundant options"));
397 origin_option_given = true;
398
399 origin = defGetString(defel);
400 if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
401 data->publish_no_origin = true;
402 else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
403 data->publish_no_origin = false;
404 else
406 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
407 errmsg("unrecognized origin value: \"%s\"", origin));
408 }
409 else
410 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
411 }
412
413 /* Check required options */
414 if (!protocol_version_given)
416 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 errmsg("option \"%s\" missing", "proto_version"));
418 if (!publication_names_given)
420 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
421 errmsg("option \"%s\" missing", "publication_names"));
422}
423
424/*
425 * Initialize this plugin
426 */
427static void
429 bool is_init)
430{
432 static bool publication_callback_registered = false;
433
434 /* Create our memory context for private allocations. */
435 data->context = AllocSetContextCreate(ctx->context,
436 "logical replication output context",
438
439 data->cachectx = AllocSetContextCreate(ctx->context,
440 "logical replication cache context",
442
443 data->pubctx = AllocSetContextCreate(ctx->context,
444 "logical replication publication list context",
446
448
449 /* This plugin uses binary protocol. */
451
452 /*
453 * This is replication start and not slot initialization.
454 *
455 * Parse and validate options passed by the client.
456 */
457 if (!is_init)
458 {
459 /* Parse the params and ERROR if we see any we don't recognize */
461
462 /* Check if we support requested protocol */
463 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
465 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
466 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
467 data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
468
469 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
471 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
472 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
473 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
474
475 /*
476 * Decide whether to enable streaming. It is disabled by default, in
477 * which case we just update the flag in decoding context. Otherwise
478 * we only allow it with sufficient version of the protocol, and when
479 * the output plugin supports it.
480 */
481 if (data->streaming == LOGICALREP_STREAM_OFF)
482 ctx->streaming = false;
483 else if (data->streaming == LOGICALREP_STREAM_ON &&
484 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
486 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
487 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
488 data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
489 else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
492 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
493 errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
495 else if (!ctx->streaming)
497 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
498 errmsg("streaming requested, but not supported by output plugin")));
499
500 /*
501 * Here, we just check whether the two-phase option is passed by
502 * plugin and decide whether to enable it at later point of time. It
503 * remains enabled if the previous start-up has done so. But we only
504 * allow the option to be passed in with sufficient version of the
505 * protocol, and when the output plugin supports it.
506 */
507 if (!data->two_phase)
508 ctx->twophase_opt_given = false;
509 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
511 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
512 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
513 data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
514 else if (!ctx->twophase)
516 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
517 errmsg("two-phase commit requested, but not supported by output plugin")));
518 else
519 ctx->twophase_opt_given = true;
520
521 /* Init publication state. */
522 data->publications = NIL;
523 publications_valid = false;
524
525 /*
526 * Register callback for pg_publication if we didn't already do that
527 * during some previous call in this process.
528 */
529 if (!publication_callback_registered)
530 {
531 CacheRegisterSyscacheCallback(PUBLICATIONOID,
533 (Datum) 0);
534 publication_callback_registered = true;
535 }
536
537 /* Initialize relation schema cache. */
539 }
540 else
541 {
542 /*
543 * Disable the streaming and prepared transactions during the slot
544 * initialization mode.
545 */
546 ctx->streaming = false;
547 ctx->twophase = false;
548 }
549}
550
551/*
552 * BEGIN callback.
553 *
554 * Don't send the BEGIN message here instead postpone it until the first
555 * change. In logical replication, a common scenario is to replicate a set of
556 * tables (instead of all tables) and transactions whose changes were on
557 * the table(s) that are not published will produce empty transactions. These
558 * empty transactions will send BEGIN and COMMIT messages to subscribers,
559 * using bandwidth on something with little/no use for logical replication.
560 */
561static void
563{
565 sizeof(PGOutputTxnData));
566
567 txn->output_plugin_private = txndata;
568}
569
570/*
571 * Send BEGIN.
572 *
573 * This is called while processing the first change of the transaction.
574 */
575static void
577{
578 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
580
581 Assert(txndata);
582 Assert(!txndata->sent_begin_txn);
583
584 OutputPluginPrepareWrite(ctx, !send_replication_origin);
585 logicalrep_write_begin(ctx->out, txn);
586 txndata->sent_begin_txn = true;
587
588 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
589 send_replication_origin);
590
591 OutputPluginWrite(ctx, true);
592}
593
594/*
595 * COMMIT callback
596 */
597static void
599 XLogRecPtr commit_lsn)
600{
602 bool sent_begin_txn;
603
604 Assert(txndata);
605
606 /*
607 * We don't need to send the commit message unless some relevant change
608 * from this transaction has been sent to the downstream.
609 */
610 sent_begin_txn = txndata->sent_begin_txn;
611 OutputPluginUpdateProgress(ctx, !sent_begin_txn);
612 pfree(txndata);
613 txn->output_plugin_private = NULL;
614
615 if (!sent_begin_txn)
616 {
617 elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
618 return;
619 }
620
621 OutputPluginPrepareWrite(ctx, true);
622 logicalrep_write_commit(ctx->out, txn, commit_lsn);
623 OutputPluginWrite(ctx, true);
624}
625
626/*
627 * BEGIN PREPARE callback
628 */
629static void
631{
632 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
633
634 OutputPluginPrepareWrite(ctx, !send_replication_origin);
636
637 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
638 send_replication_origin);
639
640 OutputPluginWrite(ctx, true);
641}
642
643/*
644 * PREPARE callback
645 */
646static void
648 XLogRecPtr prepare_lsn)
649{
650 OutputPluginUpdateProgress(ctx, false);
651
652 OutputPluginPrepareWrite(ctx, true);
653 logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
654 OutputPluginWrite(ctx, true);
655}
656
657/*
658 * COMMIT PREPARED callback
659 */
660static void
662 XLogRecPtr commit_lsn)
663{
664 OutputPluginUpdateProgress(ctx, false);
665
666 OutputPluginPrepareWrite(ctx, true);
667 logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
668 OutputPluginWrite(ctx, true);
669}
670
671/*
672 * ROLLBACK PREPARED callback
673 */
674static void
676 ReorderBufferTXN *txn,
677 XLogRecPtr prepare_end_lsn,
678 TimestampTz prepare_time)
679{
680 OutputPluginUpdateProgress(ctx, false);
681
682 OutputPluginPrepareWrite(ctx, true);
683 logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
684 prepare_time);
685 OutputPluginWrite(ctx, true);
686}
687
688/*
689 * Write the current schema of the relation and its ancestor (if any) if not
690 * done yet.
691 */
692static void
694 ReorderBufferChange *change,
695 Relation relation, RelationSyncEntry *relentry)
696{
698 bool schema_sent;
701
702 /*
703 * Remember XID of the (sub)transaction for the change. We don't care if
704 * it's top-level transaction or not (we have already sent that XID in
705 * start of the current streaming block).
706 *
707 * If we're not in a streaming block, just use InvalidTransactionId and
708 * the write methods will not include it.
709 */
710 if (data->in_streaming)
711 xid = change->txn->xid;
712
713 if (rbtxn_is_subtxn(change->txn))
714 topxid = rbtxn_get_toptxn(change->txn)->xid;
715 else
716 topxid = xid;
717
718 /*
719 * Do we need to send the schema? We do track streamed transactions
720 * separately, because those may be applied later (and the regular
721 * transactions won't see their effects until then) and in an order that
722 * we don't know at this point.
723 *
724 * XXX There is a scope of optimization here. Currently, we always send
725 * the schema first time in a streaming transaction but we can probably
726 * avoid that by checking 'relentry->schema_sent' flag. However, before
727 * doing that we need to study its impact on the case where we have a mix
728 * of streaming and non-streaming transactions.
729 */
730 if (data->in_streaming)
731 schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
732 else
733 schema_sent = relentry->schema_sent;
734
735 /* Nothing to do if we already sent the schema. */
736 if (schema_sent)
737 return;
738
739 /*
740 * Send the schema. If the changes will be published using an ancestor's
741 * schema, not the relation's own, send that ancestor's schema before
742 * sending relation's own (XXX - maybe sending only the former suffices?).
743 */
744 if (relentry->publish_as_relid != RelationGetRelid(relation))
745 {
746 Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
747
748 send_relation_and_attrs(ancestor, xid, ctx, relentry);
749 RelationClose(ancestor);
750 }
751
752 send_relation_and_attrs(relation, xid, ctx, relentry);
753
754 if (data->in_streaming)
755 set_schema_sent_in_streamed_txn(relentry, topxid);
756 else
757 relentry->schema_sent = true;
758}
759
760/*
761 * Sends a relation
762 */
763static void
766 RelationSyncEntry *relentry)
767{
768 TupleDesc desc = RelationGetDescr(relation);
769 Bitmapset *columns = relentry->columns;
770 PublishGencolsType include_gencols_type = relentry->include_gencols_type;
771 int i;
772
773 /*
774 * Write out type info if needed. We do that only for user-created types.
775 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
776 * objects with hand-assigned OIDs to be "built in", not for instance any
777 * function or type defined in the information_schema. This is important
778 * because only hand-assigned OIDs can be expected to remain stable across
779 * major versions.
780 */
781 for (i = 0; i < desc->natts; i++)
782 {
783 Form_pg_attribute att = TupleDescAttr(desc, i);
784
785 if (!logicalrep_should_publish_column(att, columns,
786 include_gencols_type))
787 continue;
788
789 if (att->atttypid < FirstGenbkiObjectId)
790 continue;
791
792 OutputPluginPrepareWrite(ctx, false);
793 logicalrep_write_typ(ctx->out, xid, att->atttypid);
794 OutputPluginWrite(ctx, false);
795 }
796
797 OutputPluginPrepareWrite(ctx, false);
798 logicalrep_write_rel(ctx->out, xid, relation, columns,
799 include_gencols_type);
800 OutputPluginWrite(ctx, false);
801}
802
803/*
804 * Executor state preparation for evaluation of row filter expressions for the
805 * specified relation.
806 */
807static EState *
809{
810 EState *estate;
811 RangeTblEntry *rte;
812 List *perminfos = NIL;
813
814 estate = CreateExecutorState();
815
816 rte = makeNode(RangeTblEntry);
817 rte->rtekind = RTE_RELATION;
818 rte->relid = RelationGetRelid(rel);
819 rte->relkind = rel->rd_rel->relkind;
820 rte->rellockmode = AccessShareLock;
821
822 addRTEPermissionInfo(&perminfos, rte);
823
824 ExecInitRangeTable(estate, list_make1(rte), perminfos,
826
827 estate->es_output_cid = GetCurrentCommandId(false);
828
829 return estate;
830}
831
832/*
833 * Evaluates row filter.
834 *
835 * If the row filter evaluates to NULL, it is taken as false i.e. the change
836 * isn't replicated.
837 */
838static bool
840{
841 Datum ret;
842 bool isnull;
843
844 Assert(state != NULL);
845
846 ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
847
848 elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
849 isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
850 isnull ? "true" : "false");
851
852 if (isnull)
853 return false;
854
855 return DatumGetBool(ret);
856}
857
858/*
859 * Make sure the per-entry memory context exists.
860 */
861static void
863{
864 Relation relation;
865
866 /* The context may already exist, in which case bail out. */
867 if (entry->entry_cxt)
868 return;
869
870 relation = RelationIdGetRelation(entry->publish_as_relid);
871
872 entry->entry_cxt = AllocSetContextCreate(data->cachectx,
873 "entry private context",
875
877 RelationGetRelationName(relation));
878}
879
880/*
881 * Initialize the row filter.
882 */
883static void
885 RelationSyncEntry *entry)
886{
887 ListCell *lc;
888 List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
889 bool no_filter[] = {false, false, false}; /* One per pubaction */
890 MemoryContext oldctx;
891 int idx;
892 bool has_filter = true;
893 Oid schemaid = get_rel_namespace(entry->publish_as_relid);
894
895 /*
896 * Find if there are any row filters for this relation. If there are, then
897 * prepare the necessary ExprState and cache it in entry->exprstate. To
898 * build an expression state, we need to ensure the following:
899 *
900 * All the given publication-table mappings must be checked.
901 *
902 * Multiple publications might have multiple row filters for this
903 * relation. Since row filter usage depends on the DML operation, there
904 * are multiple lists (one for each operation) to which row filters will
905 * be appended.
906 *
907 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
908 * expression" so it takes precedence.
909 */
910 foreach(lc, publications)
911 {
912 Publication *pub = lfirst(lc);
913 HeapTuple rftuple = NULL;
914 Datum rfdatum = 0;
915 bool pub_no_filter = true;
916
917 /*
918 * If the publication is FOR ALL TABLES, or the publication includes a
919 * FOR TABLES IN SCHEMA where the table belongs to the referred
920 * schema, then it is treated the same as if there are no row filters
921 * (even if other publications have a row filter).
922 */
923 if (!pub->alltables &&
924 !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
925 ObjectIdGetDatum(schemaid),
926 ObjectIdGetDatum(pub->oid)))
927 {
928 /*
929 * Check for the presence of a row filter in this publication.
930 */
931 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
933 ObjectIdGetDatum(pub->oid));
934
935 if (HeapTupleIsValid(rftuple))
936 {
937 /* Null indicates no filter. */
938 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
939 Anum_pg_publication_rel_prqual,
940 &pub_no_filter);
941 }
942 }
943
944 if (pub_no_filter)
945 {
946 if (rftuple)
947 ReleaseSysCache(rftuple);
948
949 no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
950 no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
951 no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
952
953 /*
954 * Quick exit if all the DML actions are publicized via this
955 * publication.
956 */
957 if (no_filter[PUBACTION_INSERT] &&
958 no_filter[PUBACTION_UPDATE] &&
959 no_filter[PUBACTION_DELETE])
960 {
961 has_filter = false;
962 break;
963 }
964
965 /* No additional work for this publication. Next one. */
966 continue;
967 }
968
969 /* Form the per pubaction row filter lists. */
970 if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
971 rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
972 TextDatumGetCString(rfdatum));
973 if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
974 rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
975 TextDatumGetCString(rfdatum));
976 if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
977 rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
978 TextDatumGetCString(rfdatum));
979
980 ReleaseSysCache(rftuple);
981 } /* loop all subscribed publications */
982
983 /* Clean the row filter */
984 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
985 {
986 if (no_filter[idx])
987 {
988 list_free_deep(rfnodes[idx]);
989 rfnodes[idx] = NIL;
990 }
991 }
992
993 if (has_filter)
994 {
996
998
999 /*
1000 * Now all the filters for all pubactions are known. Combine them when
1001 * their pubactions are the same.
1002 */
1003 oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1004 entry->estate = create_estate_for_relation(relation);
1005 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1006 {
1007 List *filters = NIL;
1008 Expr *rfnode;
1009
1010 if (rfnodes[idx] == NIL)
1011 continue;
1012
1013 foreach(lc, rfnodes[idx])
1014 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1015
1016 /* combine the row filter and cache the ExprState */
1017 rfnode = make_orclause(filters);
1018 entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1019 } /* for each pubaction */
1020 MemoryContextSwitchTo(oldctx);
1021
1022 RelationClose(relation);
1023 }
1024}
1025
1026/*
1027 * If the table contains a generated column, check for any conflicting
1028 * values of 'publish_generated_columns' parameter in the publications.
1029 */
1030static void
1032 RelationSyncEntry *entry)
1033{
1035 TupleDesc desc = RelationGetDescr(relation);
1036 bool gencolpresent = false;
1037 bool first = true;
1038
1039 /* Check if there is any generated column present. */
1040 for (int i = 0; i < desc->natts; i++)
1041 {
1042 Form_pg_attribute att = TupleDescAttr(desc, i);
1043
1044 if (att->attgenerated)
1045 {
1046 gencolpresent = true;
1047 break;
1048 }
1049 }
1050
1051 /* There are no generated columns to be published. */
1052 if (!gencolpresent)
1053 {
1054 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1055 return;
1056 }
1057
1058 /*
1059 * There may be a conflicting value for 'publish_generated_columns'
1060 * parameter in the publications.
1061 */
1062 foreach_ptr(Publication, pub, publications)
1063 {
1064 /*
1065 * The column list takes precedence over the
1066 * 'publish_generated_columns' parameter. Those will be checked later,
1067 * see pgoutput_column_list_init.
1068 */
1069 if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1070 continue;
1071
1072 if (first)
1073 {
1074 entry->include_gencols_type = pub->pubgencols_type;
1075 first = false;
1076 }
1077 else if (entry->include_gencols_type != pub->pubgencols_type)
1078 ereport(ERROR,
1079 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1080 errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1082 RelationGetRelationName(relation)));
1083 }
1084}
1085
1086/*
1087 * Initialize the column list.
1088 */
1089static void
1091 RelationSyncEntry *entry)
1092{
1093 ListCell *lc;
1094 bool first = true;
1096 bool found_pub_collist = false;
1097 Bitmapset *relcols = NULL;
1098
1100
1101 /*
1102 * Find if there are any column lists for this relation. If there are,
1103 * build a bitmap using the column lists.
1104 *
1105 * Multiple publications might have multiple column lists for this
1106 * relation.
1107 *
1108 * Note that we don't support the case where the column list is different
1109 * for the same table when combining publications. See comments atop
1110 * fetch_table_list. But one can later change the publication so we still
1111 * need to check all the given publication-table mappings and report an
1112 * error if any publications have a different column list.
1113 */
1114 foreach(lc, publications)
1115 {
1116 Publication *pub = lfirst(lc);
1117 Bitmapset *cols = NULL;
1118
1119 /* Retrieve the bitmap of columns for a column list publication. */
1120 found_pub_collist |= check_and_fetch_column_list(pub,
1121 entry->publish_as_relid,
1122 entry->entry_cxt, &cols);
1123
1124 /*
1125 * For non-column list publications — e.g. TABLE (without a column
1126 * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1127 * of the table (including generated columns when
1128 * 'publish_generated_columns' parameter is true).
1129 */
1130 if (!cols)
1131 {
1132 /*
1133 * Cache the table columns for the first publication with no
1134 * specified column list to detect publication with a different
1135 * column list.
1136 */
1137 if (!relcols && (list_length(publications) > 1))
1138 {
1140
1141 relcols = pub_form_cols_map(relation,
1142 entry->include_gencols_type);
1143 MemoryContextSwitchTo(oldcxt);
1144 }
1145
1146 cols = relcols;
1147 }
1148
1149 if (first)
1150 {
1151 entry->columns = cols;
1152 first = false;
1153 }
1154 else if (!bms_equal(entry->columns, cols))
1155 ereport(ERROR,
1156 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1157 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1159 RelationGetRelationName(relation)));
1160 } /* loop all subscribed publications */
1161
1162 /*
1163 * If no column list publications exist, columns to be published will be
1164 * computed later according to the 'publish_generated_columns' parameter.
1165 */
1166 if (!found_pub_collist)
1167 entry->columns = NULL;
1168
1169 RelationClose(relation);
1170}
1171
1172/*
1173 * Initialize the slot for storing new and old tuples, and build the map that
1174 * will be used to convert the relation's tuples into the ancestor's format.
1175 */
1176static void
1178 RelationSyncEntry *entry)
1179{
1180 MemoryContext oldctx;
1181 TupleDesc oldtupdesc;
1182 TupleDesc newtupdesc;
1183
1184 oldctx = MemoryContextSwitchTo(data->cachectx);
1185
1186 /*
1187 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1188 * live as long as the cache remains.
1189 */
1190 oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1191 newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1192
1193 entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1194 entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1195
1196 MemoryContextSwitchTo(oldctx);
1197
1198 /*
1199 * Cache the map that will be used to convert the relation's tuples into
1200 * the ancestor's format, if needed.
1201 */
1202 if (entry->publish_as_relid != RelationGetRelid(relation))
1203 {
1205 TupleDesc indesc = RelationGetDescr(relation);
1206 TupleDesc outdesc = RelationGetDescr(ancestor);
1207
1208 /* Map must live as long as the logical decoding context. */
1209 oldctx = MemoryContextSwitchTo(data->cachectx);
1210
1211 entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1212
1213 MemoryContextSwitchTo(oldctx);
1214 RelationClose(ancestor);
1215 }
1216}
1217
1218/*
1219 * Change is checked against the row filter if any.
1220 *
1221 * Returns true if the change is to be replicated, else false.
1222 *
1223 * For inserts, evaluate the row filter for new tuple.
1224 * For deletes, evaluate the row filter for old tuple.
1225 * For updates, evaluate the row filter for old and new tuple.
1226 *
1227 * For updates, if both evaluations are true, we allow sending the UPDATE and
1228 * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1229 * only one of the tuples matches the row filter expression, we transform
1230 * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1231 * following rules:
1232 *
1233 * Case 1: old-row (no match) new-row (no match) -> (drop change)
1234 * Case 2: old-row (no match) new row (match) -> INSERT
1235 * Case 3: old-row (match) new-row (no match) -> DELETE
1236 * Case 4: old-row (match) new row (match) -> UPDATE
1237 *
1238 * The new action is updated in the action parameter.
1239 *
1240 * The new slot could be updated when transforming the UPDATE into INSERT,
1241 * because the original new tuple might not have column values from the replica
1242 * identity.
1243 *
1244 * Examples:
1245 * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1246 * Since the old tuple satisfies, the initial table synchronization copied this
1247 * row (or another method was used to guarantee that there is data
1248 * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1249 * row filter, so from a data consistency perspective, that row should be
1250 * removed on the subscriber. The UPDATE should be transformed into a DELETE
1251 * statement and be sent to the subscriber. Keeping this row on the subscriber
1252 * is undesirable because it doesn't reflect what was defined in the row filter
1253 * expression on the publisher. This row on the subscriber would likely not be
1254 * modified by replication again. If someone inserted a new row with the same
1255 * old identifier, replication could stop due to a constraint violation.
1256 *
1257 * Let's say the old tuple doesn't match the row filter but the new tuple does.
1258 * Since the old tuple doesn't satisfy, the initial table synchronization
1259 * probably didn't copy this row. However, after the UPDATE the new tuple does
1260 * satisfy the row filter, so from a data consistency perspective, that row
1261 * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1262 * statements have no effect (it matches no row -- see
1263 * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1264 * INSERT statement and be sent to the subscriber. However, this might surprise
1265 * someone who expects the data set to satisfy the row filter expression on the
1266 * provider.
1267 */
1268static bool
1270 TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1272{
1273 TupleDesc desc;
1274 int i;
1275 bool old_matched,
1276 new_matched,
1277 result;
1278 TupleTableSlot *tmp_new_slot;
1279 TupleTableSlot *new_slot = *new_slot_ptr;
1280 ExprContext *ecxt;
1281 ExprState *filter_exprstate;
1282
1283 /*
1284 * We need this map to avoid relying on ReorderBufferChangeType enums
1285 * having specific values.
1286 */
1287 static const int map_changetype_pubaction[] = {
1291 };
1292
1296
1297 Assert(new_slot || old_slot);
1298
1299 /* Get the corresponding row filter */
1300 filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1301
1302 /* Bail out if there is no row filter */
1303 if (!filter_exprstate)
1304 return true;
1305
1306 elog(DEBUG3, "table \"%s.%s\" has row filter",
1308 RelationGetRelationName(relation));
1309
1311
1312 ecxt = GetPerTupleExprContext(entry->estate);
1313
1314 /*
1315 * For the following occasions where there is only one tuple, we can
1316 * evaluate the row filter for that tuple and return.
1317 *
1318 * For inserts, we only have the new tuple.
1319 *
1320 * For updates, we can have only a new tuple when none of the replica
1321 * identity columns changed and none of those columns have external data
1322 * but we still need to evaluate the row filter for the new tuple as the
1323 * existing values of those columns might not match the filter. Also,
1324 * users can use constant expressions in the row filter, so we anyway need
1325 * to evaluate it for the new tuple.
1326 *
1327 * For deletes, we only have the old tuple.
1328 */
1329 if (!new_slot || !old_slot)
1330 {
1331 ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1332 result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1333
1334 return result;
1335 }
1336
1337 /*
1338 * Both the old and new tuples must be valid only for updates and need to
1339 * be checked against the row filter.
1340 */
1341 Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1342
1343 slot_getallattrs(new_slot);
1344 slot_getallattrs(old_slot);
1345
1346 tmp_new_slot = NULL;
1347 desc = RelationGetDescr(relation);
1348
1349 /*
1350 * The new tuple might not have all the replica identity columns, in which
1351 * case it needs to be copied over from the old tuple.
1352 */
1353 for (i = 0; i < desc->natts; i++)
1354 {
1356
1357 /*
1358 * if the column in the new tuple or old tuple is null, nothing to do
1359 */
1360 if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1361 continue;
1362
1363 /*
1364 * Unchanged toasted replica identity columns are only logged in the
1365 * old tuple. Copy this over to the new tuple. The changed (or WAL
1366 * Logged) toast values are always assembled in memory and set as
1367 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1368 */
1369 if (att->attlen == -1 &&
1372 {
1373 if (!tmp_new_slot)
1374 {
1375 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1376 ExecClearTuple(tmp_new_slot);
1377
1378 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1379 desc->natts * sizeof(Datum));
1380 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1381 desc->natts * sizeof(bool));
1382 }
1383
1384 tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1385 tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1386 }
1387 }
1388
1389 ecxt->ecxt_scantuple = old_slot;
1390 old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1391
1392 if (tmp_new_slot)
1393 {
1394 ExecStoreVirtualTuple(tmp_new_slot);
1395 ecxt->ecxt_scantuple = tmp_new_slot;
1396 }
1397 else
1398 ecxt->ecxt_scantuple = new_slot;
1399
1400 new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1401
1402 /*
1403 * Case 1: if both tuples don't match the row filter, bailout. Send
1404 * nothing.
1405 */
1406 if (!old_matched && !new_matched)
1407 return false;
1408
1409 /*
1410 * Case 2: if the old tuple doesn't satisfy the row filter but the new
1411 * tuple does, transform the UPDATE into INSERT.
1412 *
1413 * Use the newly transformed tuple that must contain the column values for
1414 * all the replica identity columns. This is required to ensure that the
1415 * while inserting the tuple in the downstream node, we have all the
1416 * required column values.
1417 */
1418 if (!old_matched && new_matched)
1419 {
1421
1422 if (tmp_new_slot)
1423 *new_slot_ptr = tmp_new_slot;
1424 }
1425
1426 /*
1427 * Case 3: if the old tuple satisfies the row filter but the new tuple
1428 * doesn't, transform the UPDATE into DELETE.
1429 *
1430 * This transformation does not require another tuple. The Old tuple will
1431 * be used for DELETE.
1432 */
1433 else if (old_matched && !new_matched)
1435
1436 /*
1437 * Case 4: if both tuples match the row filter, transformation isn't
1438 * required. (*action is default UPDATE).
1439 */
1440
1441 return true;
1442}
1443
1444/*
1445 * Sends the decoded DML over wire.
1446 *
1447 * This is called both in streaming and non-streaming modes.
1448 */
1449static void
1451 Relation relation, ReorderBufferChange *change)
1452{
1455 MemoryContext old;
1456 RelationSyncEntry *relentry;
1458 Relation ancestor = NULL;
1459 Relation targetrel = relation;
1461 TupleTableSlot *old_slot = NULL;
1462 TupleTableSlot *new_slot = NULL;
1463
1464 if (!is_publishable_relation(relation))
1465 return;
1466
1467 /*
1468 * Remember the xid for the change in streaming mode. We need to send xid
1469 * with each change in the streaming mode so that subscriber can make
1470 * their association and on aborts, it can discard the corresponding
1471 * changes.
1472 */
1473 if (data->in_streaming)
1474 xid = change->txn->xid;
1475
1476 relentry = get_rel_sync_entry(data, relation);
1477
1478 /* First check the table filter */
1479 switch (action)
1480 {
1482 if (!relentry->pubactions.pubinsert)
1483 return;
1484 break;
1486 if (!relentry->pubactions.pubupdate)
1487 return;
1488 break;
1490 if (!relentry->pubactions.pubdelete)
1491 return;
1492
1493 /*
1494 * This is only possible if deletes are allowed even when replica
1495 * identity is not defined for a table. Since the DELETE action
1496 * can't be published, we simply return.
1497 */
1498 if (!change->data.tp.oldtuple)
1499 {
1500 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1501 return;
1502 }
1503 break;
1504 default:
1505 Assert(false);
1506 }
1507
1508 /* Avoid leaking memory by using and resetting our own context */
1509 old = MemoryContextSwitchTo(data->context);
1510
1511 /* Switch relation if publishing via root. */
1512 if (relentry->publish_as_relid != RelationGetRelid(relation))
1513 {
1514 Assert(relation->rd_rel->relispartition);
1515 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1516 targetrel = ancestor;
1517 }
1518
1519 if (change->data.tp.oldtuple)
1520 {
1521 old_slot = relentry->old_slot;
1522 ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1523
1524 /* Convert tuple if needed. */
1525 if (relentry->attrmap)
1526 {
1528 &TTSOpsVirtual);
1529
1530 old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1531 }
1532 }
1533
1534 if (change->data.tp.newtuple)
1535 {
1536 new_slot = relentry->new_slot;
1537 ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1538
1539 /* Convert tuple if needed. */
1540 if (relentry->attrmap)
1541 {
1543 &TTSOpsVirtual);
1544
1545 new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1546 }
1547 }
1548
1549 /*
1550 * Check row filter.
1551 *
1552 * Updates could be transformed to inserts or deletes based on the results
1553 * of the row filter for old and new tuple.
1554 */
1555 if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1556 goto cleanup;
1557
1558 /*
1559 * Send BEGIN if we haven't yet.
1560 *
1561 * We send the BEGIN message after ensuring that we will actually send the
1562 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1563 * transactions.
1564 */
1565 if (txndata && !txndata->sent_begin_txn)
1566 pgoutput_send_begin(ctx, txn);
1567
1568 /*
1569 * Schema should be sent using the original relation because it also sends
1570 * the ancestor's relation.
1571 */
1572 maybe_send_schema(ctx, change, relation, relentry);
1573
1574 OutputPluginPrepareWrite(ctx, true);
1575
1576 /* Send the data */
1577 switch (action)
1578 {
1580 logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1581 data->binary, relentry->columns,
1582 relentry->include_gencols_type);
1583 break;
1585 logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1586 new_slot, data->binary, relentry->columns,
1587 relentry->include_gencols_type);
1588 break;
1590 logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1591 data->binary, relentry->columns,
1592 relentry->include_gencols_type);
1593 break;
1594 default:
1595 Assert(false);
1596 }
1597
1598 OutputPluginWrite(ctx, true);
1599
1600cleanup:
1601 if (RelationIsValid(ancestor))
1602 {
1603 RelationClose(ancestor);
1604 ancestor = NULL;
1605 }
1606
1607 /* Drop the new slots that were used to store the converted tuples. */
1608 if (relentry->attrmap)
1609 {
1610 if (old_slot)
1612
1613 if (new_slot)
1615 }
1616
1618 MemoryContextReset(data->context);
1619}
1620
1621static void
1623 int nrelations, Relation relations[], ReorderBufferChange *change)
1624{
1627 MemoryContext old;
1628 RelationSyncEntry *relentry;
1629 int i;
1630 int nrelids;
1631 Oid *relids;
1633
1634 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1635 if (data->in_streaming)
1636 xid = change->txn->xid;
1637
1638 old = MemoryContextSwitchTo(data->context);
1639
1640 relids = palloc0(nrelations * sizeof(Oid));
1641 nrelids = 0;
1642
1643 for (i = 0; i < nrelations; i++)
1644 {
1645 Relation relation = relations[i];
1646 Oid relid = RelationGetRelid(relation);
1647
1648 if (!is_publishable_relation(relation))
1649 continue;
1650
1651 relentry = get_rel_sync_entry(data, relation);
1652
1653 if (!relentry->pubactions.pubtruncate)
1654 continue;
1655
1656 /*
1657 * Don't send partitions if the publication wants to send only the
1658 * root tables through it.
1659 */
1660 if (relation->rd_rel->relispartition &&
1661 relentry->publish_as_relid != relid)
1662 continue;
1663
1664 relids[nrelids++] = relid;
1665
1666 /* Send BEGIN if we haven't yet */
1667 if (txndata && !txndata->sent_begin_txn)
1668 pgoutput_send_begin(ctx, txn);
1669
1670 maybe_send_schema(ctx, change, relation, relentry);
1671 }
1672
1673 if (nrelids > 0)
1674 {
1675 OutputPluginPrepareWrite(ctx, true);
1677 xid,
1678 nrelids,
1679 relids,
1680 change->data.truncate.cascade,
1681 change->data.truncate.restart_seqs);
1682 OutputPluginWrite(ctx, true);
1683 }
1684
1686 MemoryContextReset(data->context);
1687}
1688
1689static void
1691 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1692 const char *message)
1693{
1696
1697 if (!data->messages)
1698 return;
1699
1700 /*
1701 * Remember the xid for the message in streaming mode. See
1702 * pgoutput_change.
1703 */
1704 if (data->in_streaming)
1705 xid = txn->xid;
1706
1707 /*
1708 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1709 */
1710 if (transactional)
1711 {
1713
1714 /* Send BEGIN if we haven't yet */
1715 if (txndata && !txndata->sent_begin_txn)
1716 pgoutput_send_begin(ctx, txn);
1717 }
1718
1719 OutputPluginPrepareWrite(ctx, true);
1721 xid,
1722 message_lsn,
1723 transactional,
1724 prefix,
1725 sz,
1726 message);
1727 OutputPluginWrite(ctx, true);
1728}
1729
1730/*
1731 * Return true if the data is associated with an origin and the user has
1732 * requested the changes that don't have an origin, false otherwise.
1733 */
1734static bool
1736 RepOriginId origin_id)
1737{
1739
1740 if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1741 return true;
1742
1743 return false;
1744}
1745
1746/*
1747 * Shutdown the output plugin.
1748 *
1749 * Note, we don't need to clean the data->context, data->cachectx, and
1750 * data->pubctx as they are child contexts of the ctx->context so they
1751 * will be cleaned up by logical decoding machinery.
1752 */
1753static void
1755{
1757 {
1759 RelationSyncCache = NULL;
1760 }
1761}
1762
1763/*
1764 * Load publications from the list of publication names.
1765 */
1766static List *
1768{
1769 List *result = NIL;
1770 ListCell *lc;
1771
1772 foreach(lc, pubnames)
1773 {
1774 char *pubname = (char *) lfirst(lc);
1775 Publication *pub = GetPublicationByName(pubname, false);
1776
1777 result = lappend(result, pub);
1778 }
1779
1780 return result;
1781}
1782
1783/*
1784 * Publication syscache invalidation callback.
1785 *
1786 * Called for invalidations on pg_publication.
1787 */
1788static void
1790{
1791 publications_valid = false;
1792
1793 /*
1794 * Also invalidate per-relation cache so that next time the filtering info
1795 * is checked it will be updated with the new publication settings.
1796 */
1797 rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
1798}
1799
1800/*
1801 * START STREAM callback
1802 */
1803static void
1805 ReorderBufferTXN *txn)
1806{
1808 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1809
1810 /* we can't nest streaming of transactions */
1811 Assert(!data->in_streaming);
1812
1813 /*
1814 * If we already sent the first stream for this transaction then don't
1815 * send the origin id in the subsequent streams.
1816 */
1817 if (rbtxn_is_streamed(txn))
1818 send_replication_origin = false;
1819
1820 OutputPluginPrepareWrite(ctx, !send_replication_origin);
1822
1824 send_replication_origin);
1825
1826 OutputPluginWrite(ctx, true);
1827
1828 /* we're streaming a chunk of transaction now */
1829 data->in_streaming = true;
1830}
1831
1832/*
1833 * STOP STREAM callback
1834 */
1835static void
1837 ReorderBufferTXN *txn)
1838{
1840
1841 /* we should be streaming a transaction */
1842 Assert(data->in_streaming);
1843
1844 OutputPluginPrepareWrite(ctx, true);
1846 OutputPluginWrite(ctx, true);
1847
1848 /* we've stopped streaming a transaction */
1849 data->in_streaming = false;
1850}
1851
1852/*
1853 * Notify downstream to discard the streamed transaction (along with all
1854 * its subtransactions, if it's a toplevel transaction).
1855 */
1856static void
1858 ReorderBufferTXN *txn,
1859 XLogRecPtr abort_lsn)
1860{
1861 ReorderBufferTXN *toptxn;
1863 bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1864
1865 /*
1866 * The abort should happen outside streaming block, even for streamed
1867 * transactions. The transaction has to be marked as streamed, though.
1868 */
1869 Assert(!data->in_streaming);
1870
1871 /* determine the toplevel transaction */
1872 toptxn = rbtxn_get_toptxn(txn);
1873
1874 Assert(rbtxn_is_streamed(toptxn));
1875
1876 OutputPluginPrepareWrite(ctx, true);
1877 logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1878 txn->xact_time.abort_time, write_abort_info);
1879
1880 OutputPluginWrite(ctx, true);
1881
1882 cleanup_rel_sync_cache(toptxn->xid, false);
1883}
1884
1885/*
1886 * Notify downstream to apply the streamed transaction (along with all
1887 * its subtransactions).
1888 */
1889static void
1891 ReorderBufferTXN *txn,
1892 XLogRecPtr commit_lsn)
1893{
1895
1896 /*
1897 * The commit should happen outside streaming block, even for streamed
1898 * transactions. The transaction has to be marked as streamed, though.
1899 */
1900 Assert(!data->in_streaming);
1902
1903 OutputPluginUpdateProgress(ctx, false);
1904
1905 OutputPluginPrepareWrite(ctx, true);
1906 logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1907 OutputPluginWrite(ctx, true);
1908
1909 cleanup_rel_sync_cache(txn->xid, true);
1910}
1911
1912/*
1913 * PREPARE callback (for streaming two-phase commit).
1914 *
1915 * Notify the downstream to prepare the transaction.
1916 */
1917static void
1919 ReorderBufferTXN *txn,
1920 XLogRecPtr prepare_lsn)
1921{
1923
1924 OutputPluginUpdateProgress(ctx, false);
1925 OutputPluginPrepareWrite(ctx, true);
1926 logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1927 OutputPluginWrite(ctx, true);
1928}
1929
1930/*
1931 * Initialize the relation schema sync cache for a decoding session.
1932 *
1933 * The hash table is destroyed at the end of a decoding session. While
1934 * relcache invalidations still exist and will still be invoked, they
1935 * will just see the null hash table global and take no action.
1936 */
1937static void
1939{
1940 HASHCTL ctl;
1941 static bool relation_callbacks_registered = false;
1942
1943 /* Nothing to do if hash table already exists */
1944 if (RelationSyncCache != NULL)
1945 return;
1946
1947 /* Make a new hash table for the cache */
1948 ctl.keysize = sizeof(Oid);
1949 ctl.entrysize = sizeof(RelationSyncEntry);
1950 ctl.hcxt = cachectx;
1951
1952 RelationSyncCache = hash_create("logical replication output relation cache",
1953 128, &ctl,
1955
1956 Assert(RelationSyncCache != NULL);
1957
1958 /* No more to do if we already registered callbacks */
1959 if (relation_callbacks_registered)
1960 return;
1961
1962 /* We must update the cache entry for a relation after a relcache flush */
1964
1965 /*
1966 * Flush all cache entries after a pg_namespace change, in case it was a
1967 * schema rename affecting a relation being replicated.
1968 */
1969 CacheRegisterSyscacheCallback(NAMESPACEOID,
1971 (Datum) 0);
1972
1973 /*
1974 * Flush all cache entries after any publication changes. (We need no
1975 * callback entry for pg_publication, because publication_invalidation_cb
1976 * will take care of it.)
1977 */
1978 CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
1980 (Datum) 0);
1981 CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
1983 (Datum) 0);
1984
1985 relation_callbacks_registered = true;
1986}
1987
1988/*
1989 * We expect relatively small number of streamed transactions.
1990 */
1991static bool
1993{
1994 return list_member_xid(entry->streamed_txns, xid);
1995}
1996
1997/*
1998 * Add the xid in the rel sync entry for which we have already sent the schema
1999 * of the relation.
2000 */
2001static void
2003{
2004 MemoryContext oldctx;
2005
2007
2008 entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2009
2010 MemoryContextSwitchTo(oldctx);
2011}
2012
2013/*
2014 * Find or create entry in the relation schema cache.
2015 *
2016 * This looks up publications that the given relation is directly or
2017 * indirectly part of (the latter if it's really the relation's ancestor that
2018 * is part of a publication) and fills up the found entry with the information
2019 * about which operations to publish and whether to use an ancestor's schema
2020 * when publishing.
2021 */
2022static RelationSyncEntry *
2024{
2025 RelationSyncEntry *entry;
2026 bool found;
2027 MemoryContext oldctx;
2028 Oid relid = RelationGetRelid(relation);
2029
2030 Assert(RelationSyncCache != NULL);
2031
2032 /* Find cached relation info, creating if not found */
2034 &relid,
2035 HASH_ENTER, &found);
2036 Assert(entry != NULL);
2037
2038 /* initialize entry, if it's new */
2039 if (!found)
2040 {
2041 entry->replicate_valid = false;
2042 entry->schema_sent = false;
2043 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2044 entry->streamed_txns = NIL;
2045 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2046 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2047 entry->new_slot = NULL;
2048 entry->old_slot = NULL;
2049 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2050 entry->entry_cxt = NULL;
2052 entry->columns = NULL;
2053 entry->attrmap = NULL;
2054 }
2055
2056 /* Validate the entry */
2057 if (!entry->replicate_valid)
2058 {
2059 Oid schemaId = get_rel_namespace(relid);
2060 List *pubids = GetRelationPublications(relid);
2061
2062 /*
2063 * We don't acquire a lock on the namespace system table as we build
2064 * the cache entry using a historic snapshot and all the later changes
2065 * are absorbed while decoding WAL.
2066 */
2067 List *schemaPubids = GetSchemaPublications(schemaId);
2068 ListCell *lc;
2069 Oid publish_as_relid = relid;
2070 int publish_ancestor_level = 0;
2071 bool am_partition = get_rel_relispartition(relid);
2072 char relkind = get_rel_relkind(relid);
2073 List *rel_publications = NIL;
2074
2075 /* Reload publications if needed before use. */
2076 if (!publications_valid)
2077 {
2078 MemoryContextReset(data->pubctx);
2079
2080 oldctx = MemoryContextSwitchTo(data->pubctx);
2081 data->publications = LoadPublications(data->publication_names);
2082 MemoryContextSwitchTo(oldctx);
2083 publications_valid = true;
2084 }
2085
2086 /*
2087 * Reset schema_sent status as the relation definition may have
2088 * changed. Also reset pubactions to empty in case rel was dropped
2089 * from a publication. Also free any objects that depended on the
2090 * earlier definition.
2091 */
2092 entry->schema_sent = false;
2093 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2094 list_free(entry->streamed_txns);
2095 entry->streamed_txns = NIL;
2096 bms_free(entry->columns);
2097 entry->columns = NULL;
2098 entry->pubactions.pubinsert = false;
2099 entry->pubactions.pubupdate = false;
2100 entry->pubactions.pubdelete = false;
2101 entry->pubactions.pubtruncate = false;
2102
2103 /*
2104 * Tuple slots cleanups. (Will be rebuilt later if needed).
2105 */
2106 if (entry->old_slot)
2107 {
2108 TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2109
2110 Assert(desc->tdrefcount == -1);
2111
2113
2114 /*
2115 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2116 * do it now to avoid any leaks.
2117 */
2118 FreeTupleDesc(desc);
2119 }
2120 if (entry->new_slot)
2121 {
2122 TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2123
2124 Assert(desc->tdrefcount == -1);
2125
2127
2128 /*
2129 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2130 * do it now to avoid any leaks.
2131 */
2132 FreeTupleDesc(desc);
2133 }
2134
2135 entry->old_slot = NULL;
2136 entry->new_slot = NULL;
2137
2138 if (entry->attrmap)
2139 free_attrmap(entry->attrmap);
2140 entry->attrmap = NULL;
2141
2142 /*
2143 * Row filter cache cleanups.
2144 */
2145 if (entry->entry_cxt)
2147
2148 entry->entry_cxt = NULL;
2149 entry->estate = NULL;
2150 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2151
2152 /*
2153 * Build publication cache. We can't use one provided by relcache as
2154 * relcache considers all publications that the given relation is in,
2155 * but here we only need to consider ones that the subscriber
2156 * requested.
2157 */
2158 foreach(lc, data->publications)
2159 {
2160 Publication *pub = lfirst(lc);
2161 bool publish = false;
2162
2163 /*
2164 * Under what relid should we publish changes in this publication?
2165 * We'll use the top-most relid across all publications. Also
2166 * track the ancestor level for this publication.
2167 */
2168 Oid pub_relid = relid;
2169 int ancestor_level = 0;
2170
2171 /*
2172 * If this is a FOR ALL TABLES publication, pick the partition
2173 * root and set the ancestor level accordingly.
2174 */
2175 if (pub->alltables)
2176 {
2177 publish = true;
2178 if (pub->pubviaroot && am_partition)
2179 {
2180 List *ancestors = get_partition_ancestors(relid);
2181
2182 pub_relid = llast_oid(ancestors);
2183 ancestor_level = list_length(ancestors);
2184 }
2185 }
2186
2187 if (!publish)
2188 {
2189 bool ancestor_published = false;
2190
2191 /*
2192 * For a partition, check if any of the ancestors are
2193 * published. If so, note down the topmost ancestor that is
2194 * published via this publication, which will be used as the
2195 * relation via which to publish the partition's changes.
2196 */
2197 if (am_partition)
2198 {
2199 Oid ancestor;
2200 int level;
2201 List *ancestors = get_partition_ancestors(relid);
2202
2203 ancestor = GetTopMostAncestorInPublication(pub->oid,
2204 ancestors,
2205 &level);
2206
2207 if (ancestor != InvalidOid)
2208 {
2209 ancestor_published = true;
2210 if (pub->pubviaroot)
2211 {
2212 pub_relid = ancestor;
2213 ancestor_level = level;
2214 }
2215 }
2216 }
2217
2218 if (list_member_oid(pubids, pub->oid) ||
2219 list_member_oid(schemaPubids, pub->oid) ||
2220 ancestor_published)
2221 publish = true;
2222 }
2223
2224 /*
2225 * If the relation is to be published, determine actions to
2226 * publish, and list of columns, if appropriate.
2227 *
2228 * Don't publish changes for partitioned tables, because
2229 * publishing those of its partitions suffices, unless partition
2230 * changes won't be published due to pubviaroot being set.
2231 */
2232 if (publish &&
2233 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2234 {
2239
2240 /*
2241 * We want to publish the changes as the top-most ancestor
2242 * across all publications. So we need to check if the already
2243 * calculated level is higher than the new one. If yes, we can
2244 * ignore the new value (as it's a child). Otherwise the new
2245 * value is an ancestor, so we keep it.
2246 */
2247 if (publish_ancestor_level > ancestor_level)
2248 continue;
2249
2250 /*
2251 * If we found an ancestor higher up in the tree, discard the
2252 * list of publications through which we replicate it, and use
2253 * the new ancestor.
2254 */
2255 if (publish_ancestor_level < ancestor_level)
2256 {
2257 publish_as_relid = pub_relid;
2258 publish_ancestor_level = ancestor_level;
2259
2260 /* reset the publication list for this relation */
2261 rel_publications = NIL;
2262 }
2263 else
2264 {
2265 /* Same ancestor level, has to be the same OID. */
2266 Assert(publish_as_relid == pub_relid);
2267 }
2268
2269 /* Track publications for this ancestor. */
2270 rel_publications = lappend(rel_publications, pub);
2271 }
2272 }
2273
2274 entry->publish_as_relid = publish_as_relid;
2275
2276 /*
2277 * Initialize the tuple slot, map, and row filter. These are only used
2278 * when publishing inserts, updates, or deletes.
2279 */
2280 if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2281 entry->pubactions.pubdelete)
2282 {
2283 /* Initialize the tuple slot and map */
2284 init_tuple_slot(data, relation, entry);
2285
2286 /* Initialize the row filter */
2287 pgoutput_row_filter_init(data, rel_publications, entry);
2288
2289 /* Check whether to publish generated columns. */
2290 check_and_init_gencol(data, rel_publications, entry);
2291
2292 /* Initialize the column list */
2293 pgoutput_column_list_init(data, rel_publications, entry);
2294 }
2295
2296 list_free(pubids);
2297 list_free(schemaPubids);
2298 list_free(rel_publications);
2299
2300 entry->replicate_valid = true;
2301 }
2302
2303 return entry;
2304}
2305
2306/*
2307 * Cleanup list of streamed transactions and update the schema_sent flag.
2308 *
2309 * When a streamed transaction commits or aborts, we need to remove the
2310 * toplevel XID from the schema cache. If the transaction aborted, the
2311 * subscriber will simply throw away the schema records we streamed, so
2312 * we don't need to do anything else.
2313 *
2314 * If the transaction is committed, the subscriber will update the relation
2315 * cache - so tweak the schema_sent flag accordingly.
2316 */
2317static void
2319{
2320 HASH_SEQ_STATUS hash_seq;
2321 RelationSyncEntry *entry;
2322
2323 Assert(RelationSyncCache != NULL);
2324
2325 hash_seq_init(&hash_seq, RelationSyncCache);
2326 while ((entry = hash_seq_search(&hash_seq)) != NULL)
2327 {
2328 /*
2329 * We can set the schema_sent flag for an entry that has committed xid
2330 * in the list as that ensures that the subscriber would have the
2331 * corresponding schema and we don't need to send it unless there is
2332 * any invalidation for that relation.
2333 */
2334 foreach_xid(streamed_txn, entry->streamed_txns)
2335 {
2336 if (xid == streamed_txn)
2337 {
2338 if (is_commit)
2339 entry->schema_sent = true;
2340
2341 entry->streamed_txns =
2342 foreach_delete_current(entry->streamed_txns, streamed_txn);
2343 break;
2344 }
2345 }
2346 }
2347}
2348
2349/*
2350 * Relcache invalidation callback
2351 */
2352static void
2354{
2355 RelationSyncEntry *entry;
2356
2357 /*
2358 * We can get here if the plugin was used in SQL interface as the
2359 * RelationSyncCache is destroyed when the decoding finishes, but there is
2360 * no way to unregister the relcache invalidation callback.
2361 */
2362 if (RelationSyncCache == NULL)
2363 return;
2364
2365 /*
2366 * Nobody keeps pointers to entries in this hash table around outside
2367 * logical decoding callback calls - but invalidation events can come in
2368 * *during* a callback if we do any syscache access in the callback.
2369 * Because of that we must mark the cache entry as invalid but not damage
2370 * any of its substructure here. The next get_rel_sync_entry() call will
2371 * rebuild it all.
2372 */
2373 if (OidIsValid(relid))
2374 {
2375 /*
2376 * Getting invalidations for relations that aren't in the table is
2377 * entirely normal. So we don't care if it's found or not.
2378 */
2380 HASH_FIND, NULL);
2381 if (entry != NULL)
2382 entry->replicate_valid = false;
2383 }
2384 else
2385 {
2386 /* Whole cache must be flushed. */
2387 HASH_SEQ_STATUS status;
2388
2390 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2391 {
2392 entry->replicate_valid = false;
2393 }
2394 }
2395}
2396
2397/*
2398 * Publication relation/schema map syscache invalidation callback
2399 *
2400 * Called for invalidations on pg_publication, pg_publication_rel,
2401 * pg_publication_namespace, and pg_namespace.
2402 */
2403static void
2405{
2406 HASH_SEQ_STATUS status;
2407 RelationSyncEntry *entry;
2408
2409 /*
2410 * We can get here if the plugin was used in SQL interface as the
2411 * RelationSyncCache is destroyed when the decoding finishes, but there is
2412 * no way to unregister the invalidation callbacks.
2413 */
2414 if (RelationSyncCache == NULL)
2415 return;
2416
2417 /*
2418 * We have no easy way to identify which cache entries this invalidation
2419 * event might have affected, so just mark them all invalid.
2420 */
2422 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2423 {
2424 entry->replicate_valid = false;
2425 }
2426}
2427
2428/* Send Replication origin */
2429static void
2431 XLogRecPtr origin_lsn, bool send_origin)
2432{
2433 if (send_origin)
2434 {
2435 char *origin;
2436
2437 /*----------
2438 * XXX: which behaviour do we want here?
2439 *
2440 * Alternatives:
2441 * - don't send origin message if origin name not found
2442 * (that's what we do now)
2443 * - throw error - that will break replication, not good
2444 * - send some special "unknown" origin
2445 *----------
2446 */
2447 if (replorigin_by_oid(origin_id, true, &origin))
2448 {
2449 /* Message boundary */
2450 OutputPluginWrite(ctx, false);
2451 OutputPluginPrepareWrite(ctx, true);
2452
2453 logicalrep_write_origin(ctx->out, origin, origin_lsn);
2454 }
2455 }
2456}
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:259
void free_attrmap(AttrMap *map)
Definition: attmap.c:56
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:263
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
static void cleanup(void)
Definition: bootstrap.c:713
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define PG_UINT32_MAX
Definition: c.h:547
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:204
#define Assert(condition)
Definition: c.h:815
uint32_t uint32
Definition: c.h:488
uint32 TransactionId
Definition: c.h:609
#define OidIsValid(objectId)
Definition: c.h:732
size_t Size
Definition: c.h:562
int64 TimestampTz
Definition: timestamp.h:39
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define DEBUG3
Definition: elog.h:28
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:765
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1425
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1441
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1739
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:85
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1299
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1539
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:774
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define ResetPerTupleExprContext(estate)
Definition: executor.h:572
#define GetPerTupleExprContext(estate)
Definition: executor.h:563
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:361
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1749
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1707
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
#define AccessShareLock
Definition: lockdefs.h:36
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:703
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:716
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:690
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2054
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2030
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1979
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3393
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:696
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:383
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:170
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:101
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define makeNode(_type_)
Definition: nodes.h:155
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:469
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1026
List * get_partition_ancestors(Oid relid)
Definition: partition.c:134
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:200
void * arg
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_xid(var, lst)
Definition: pg_list.h:472
#define list_make1(x1)
Definition: pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
#define llast_oid(l)
Definition: pg_list.h:200
List * GetRelationPublications(Oid relid)
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * GetSchemaPublications(Oid schemaid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
bool is_publishable_relation(Relation rel)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1767
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:576
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2404
struct RelationSyncEntry RelationSyncEntry
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:862
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:286
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1177
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:839
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1450
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:103
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:630
struct PGOutputTxnData PGOutputTxnData
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
Definition: pgoutput.c:764
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:428
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1622
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1938
RowFilterPubAction
Definition: pgoutput.c:97
@ PUBACTION_INSERT
Definition: pgoutput.c:98
@ PUBACTION_UPDATE
Definition: pgoutput.c:99
@ PUBACTION_DELETE
Definition: pgoutput.c:100
PG_MODULE_MAGIC
Definition: pgoutput.c:39
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2353
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:647
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:2023
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1735
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2430
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:675
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1754
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2318
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1857
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1918
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:693
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:562
static HTAB * RelationSyncCache
Definition: pgoutput.c:217
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:884
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1890
static void check_and_init_gencol(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1031
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:661
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1269
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:2002
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1090
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1836
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1804
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1789
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:257
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1690
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:598
static bool publications_valid
Definition: pgoutput.c:83
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1992
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:808
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:78
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:293
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:403
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:374
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:667
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1158
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:640
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:187
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:723
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:528
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:583
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:49
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:237
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1104
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:353
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:116
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:1279
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1061
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:450
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1095
tree ctl
Definition: radixtree.h:1838
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:506
#define RelationGetDescr(relation)
Definition: rel.h:532
#define RelationGetRelationName(relation)
Definition: rel.h:540
#define RelationIsValid(relation)
Definition: rel.h:479
#define RelationGetNamespace(relation)
Definition: rel.h:547
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2052
void RelationClose(Relation relation)
Definition: relcache.c:2174
#define rbtxn_is_streamed(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
Definition: attmap.h:35
int16 attlen
Definition: tupdesc.h:71
char * defname
Definition: parsenodes.h:826
Node * arg
Definition: parsenodes.h:827
CommandId es_output_cid
Definition: execnodes.h:672
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:267
Definition: dynahash.c:220
Definition: pg_list.h:54
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
Definition: output_plugin.h:28
bool sent_begin_txn
Definition: pgoutput.c:213
PublicationActions pubactions
RTEKind rtekind
Definition: parsenodes.h:1056
Form_pg_class rd_rel
Definition: rel.h:111
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:152
Bitmapset * columns
Definition: pgoutput.c:178
PublicationActions pubactions
Definition: pgoutput.c:143
TupleTableSlot * old_slot
Definition: pgoutput.c:155
PublishGencolsType include_gencols_type
Definition: pgoutput.c:138
bool replicate_valid
Definition: pgoutput.c:127
MemoryContext entry_cxt
Definition: pgoutput.c:184
EState * estate
Definition: pgoutput.c:153
TupleTableSlot * new_slot
Definition: pgoutput.c:154
List * streamed_txns
Definition: pgoutput.c:139
AttrMap * attrmap
Definition: pgoutput.c:171
struct ReorderBufferChange::@110::@112 truncate
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@110 data
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
struct ReorderBufferChange::@110::@111 tp
RepOriginId origin_id
TimestampTz abort_time
void * output_plugin_private
XLogRecPtr origin_lsn
TransactionId xid
union ReorderBufferTXN::@116 xact_time
Definition: value.h:64
int tdrefcount
Definition: tupdesc.h:134
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
Definition: regguts.h:323
char defGetStreamingMode(DefElem *def)
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:600
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:232
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:102
#define InvalidTransactionId
Definition: transam.h:31
#define FirstGenbkiObjectId
Definition: transam.h:195
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition: tupdesc.c:322
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:479
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:154
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:169
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:454
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:368
#define strVal(v)
Definition: value.h:82
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3432
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:828
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28