PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
pgoutput.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pgoutput.c
4 * Logical Replication output plugin
5 *
6 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/replication/pgoutput/pgoutput.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#include "access/tupconvert.h"
16#include "catalog/partition.h"
20#include "commands/defrem.h"
22#include "executor/executor.h"
23#include "fmgr.h"
24#include "nodes/makefuncs.h"
26#include "replication/logical.h"
28#include "replication/origin.h"
31#include "utils/builtins.h"
32#include "utils/inval.h"
33#include "utils/lsyscache.h"
34#include "utils/memutils.h"
35#include "utils/rel.h"
36#include "utils/syscache.h"
37#include "utils/varlena.h"
38
40 .name = "pgoutput",
41 .version = PG_VERSION
42);
43
45 OutputPluginOptions *opt, bool is_init);
48 ReorderBufferTXN *txn);
50 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
52 ReorderBufferTXN *txn, Relation relation,
53 ReorderBufferChange *change);
55 ReorderBufferTXN *txn, int nrelations, Relation relations[],
56 ReorderBufferChange *change);
58 ReorderBufferTXN *txn, XLogRecPtr message_lsn,
59 bool transactional, const char *prefix,
60 Size sz, const char *message);
62 RepOriginId origin_id);
64 ReorderBufferTXN *txn);
66 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
68 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
71 XLogRecPtr prepare_end_lsn,
72 TimestampTz prepare_time);
73static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
74 ReorderBufferTXN *txn);
75static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
76 ReorderBufferTXN *txn);
77static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
79 XLogRecPtr abort_lsn);
82 XLogRecPtr commit_lsn);
84 ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
85
87
88static List *LoadPublications(List *pubnames);
89static void publication_invalidation_cb(Datum arg, int cacheid,
90 uint32 hashvalue);
92 RepOriginId origin_id, XLogRecPtr origin_lsn,
93 bool send_origin);
94
95/*
96 * Only 3 publication actions are used for row filtering ("insert", "update",
97 * "delete"). See RelationSyncEntry.exprstate[].
98 */
100{
104};
105
106#define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
107
108/*
109 * Entry in the map used to remember which relation schemas we sent.
110 *
111 * The schema_sent flag determines if the current schema record for the
112 * relation (and for its ancestor if publish_as_relid is set) was already
113 * sent to the subscriber (in which case we don't need to send it again).
114 *
115 * The schema cache on downstream is however updated only at commit time,
116 * and with streamed transactions the commit order may be different from
117 * the order the transactions are sent in. Also, the (sub) transactions
118 * might get aborted so we need to send the schema for each (sub) transaction
119 * so that we don't lose the schema information on abort. For handling this,
120 * we maintain the list of xids (streamed_txns) for those we have already sent
121 * the schema.
122 *
123 * For partitions, 'pubactions' considers not only the table's own
124 * publications, but also those of all of its ancestors.
125 */
126typedef struct RelationSyncEntry
127{
128 Oid relid; /* relation oid */
129
130 bool replicate_valid; /* overall validity flag for entry */
131
133
134 /*
135 * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
136 * columns and the 'publish_generated_columns' parameter is set to
137 * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
138 * indicating that no generated columns should be published, unless
139 * explicitly specified in the column list.
140 */
141 PublishGencolsType include_gencols_type;
142 List *streamed_txns; /* streamed toplevel transactions with this
143 * schema */
144
145 /* are we publishing this rel? */
147
148 /*
149 * ExprState array for row filter. Different publication actions don't
150 * allow multiple expressions to always be combined into one, because
151 * updates or deletes restrict the column in expression to be part of the
152 * replica identity index whereas inserts do not have this restriction, so
153 * there is one ExprState per publication action.
154 */
156 EState *estate; /* executor state used for row filter */
157 TupleTableSlot *new_slot; /* slot for storing new tuple */
158 TupleTableSlot *old_slot; /* slot for storing old tuple */
159
160 /*
161 * OID of the relation to publish changes as. For a partition, this may
162 * be set to one of its ancestors whose schema will be used when
163 * replicating changes, if publish_via_partition_root is set for the
164 * publication.
165 */
167
168 /*
169 * Map used when replicating using an ancestor's schema to convert tuples
170 * from partition's type to the ancestor's; NULL if publish_as_relid is
171 * same as 'relid' or if unnecessary due to partition and the ancestor
172 * having identical TupleDesc.
173 */
175
176 /*
177 * Columns included in the publication, or NULL if all columns are
178 * included implicitly. Note that the attnums in this bitmap are not
179 * shifted by FirstLowInvalidHeapAttributeNumber.
180 */
182
183 /*
184 * Private context to store additional data for this entry - state for the
185 * row filter expressions, column list, etc.
186 */
189
190/*
191 * Maintain a per-transaction level variable to track whether the transaction
192 * has sent BEGIN. BEGIN is only sent when the first change in a transaction
193 * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
194 * messages for empty transactions which saves network bandwidth.
195 *
196 * This optimization is not used for prepared transactions because if the
197 * WALSender restarts after prepare of a transaction and before commit prepared
198 * of the same transaction then we won't be able to figure out if we have
199 * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
200 * because we would have lost the in-memory txndata information that was
201 * present prior to the restart. This will result in sending a spurious
202 * COMMIT PREPARED without a corresponding prepared transaction at the
203 * downstream which would lead to an error when it tries to process it.
204 *
205 * XXX We could achieve this optimization by changing protocol to send
206 * additional information so that downstream can detect that the corresponding
207 * prepare has not been sent. However, adding such a check for every
208 * transaction in the downstream could be costly so we might want to do it
209 * optionally.
210 *
211 * We also don't have this optimization for streamed transactions because
212 * they can contain prepared transactions.
213 */
214typedef struct PGOutputTxnData
215{
216 bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */
218
219/* Map used to remember which relation schemas we sent. */
220static HTAB *RelationSyncCache = NULL;
221
222static void init_rel_sync_cache(MemoryContext cachectx);
223static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
225 Relation relation);
226static void send_relation_and_attrs(Relation relation, TransactionId xid,
228 RelationSyncEntry *relentry);
229static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
230static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
231 uint32 hashvalue);
233 TransactionId xid);
235 TransactionId xid);
236static void init_tuple_slot(PGOutputData *data, Relation relation,
237 RelationSyncEntry *entry);
238
239/* row filter routines */
242 List *publications,
243 RelationSyncEntry *entry);
245 ExprContext *econtext);
246static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
247 TupleTableSlot **new_slot_ptr,
248 RelationSyncEntry *entry,
250
251/* column list routines */
253 List *publications,
254 RelationSyncEntry *entry);
255
256/*
257 * Specify output plugin callbacks
258 */
259void
261{
268
275
276 /* transaction streaming */
284 /* transaction streaming - two-phase commit */
286}
287
288static void
290{
291 ListCell *lc;
292 bool protocol_version_given = false;
293 bool publication_names_given = false;
294 bool binary_option_given = false;
295 bool messages_option_given = false;
296 bool streaming_given = false;
297 bool two_phase_option_given = false;
298 bool origin_option_given = false;
299
300 data->binary = false;
301 data->streaming = LOGICALREP_STREAM_OFF;
302 data->messages = false;
303 data->two_phase = false;
304
305 foreach(lc, options)
306 {
307 DefElem *defel = (DefElem *) lfirst(lc);
308
309 Assert(defel->arg == NULL || IsA(defel->arg, String));
310
311 /* Check each param, whether or not we recognize it */
312 if (strcmp(defel->defname, "proto_version") == 0)
313 {
314 unsigned long parsed;
315 char *endptr;
316
317 if (protocol_version_given)
319 (errcode(ERRCODE_SYNTAX_ERROR),
320 errmsg("conflicting or redundant options")));
321 protocol_version_given = true;
322
323 errno = 0;
324 parsed = strtoul(strVal(defel->arg), &endptr, 10);
325 if (errno != 0 || *endptr != '\0')
327 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
328 errmsg("invalid proto_version")));
329
330 if (parsed > PG_UINT32_MAX)
332 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
333 errmsg("proto_version \"%s\" out of range",
334 strVal(defel->arg))));
335
336 data->protocol_version = (uint32) parsed;
337 }
338 else if (strcmp(defel->defname, "publication_names") == 0)
339 {
340 if (publication_names_given)
342 (errcode(ERRCODE_SYNTAX_ERROR),
343 errmsg("conflicting or redundant options")));
344 publication_names_given = true;
345
346 if (!SplitIdentifierString(strVal(defel->arg), ',',
347 &data->publication_names))
349 (errcode(ERRCODE_INVALID_NAME),
350 errmsg("invalid publication_names syntax")));
351 }
352 else if (strcmp(defel->defname, "binary") == 0)
353 {
354 if (binary_option_given)
356 (errcode(ERRCODE_SYNTAX_ERROR),
357 errmsg("conflicting or redundant options")));
358 binary_option_given = true;
359
360 data->binary = defGetBoolean(defel);
361 }
362 else if (strcmp(defel->defname, "messages") == 0)
363 {
364 if (messages_option_given)
366 (errcode(ERRCODE_SYNTAX_ERROR),
367 errmsg("conflicting or redundant options")));
368 messages_option_given = true;
369
370 data->messages = defGetBoolean(defel);
371 }
372 else if (strcmp(defel->defname, "streaming") == 0)
373 {
374 if (streaming_given)
376 (errcode(ERRCODE_SYNTAX_ERROR),
377 errmsg("conflicting or redundant options")));
378 streaming_given = true;
379
380 data->streaming = defGetStreamingMode(defel);
381 }
382 else if (strcmp(defel->defname, "two_phase") == 0)
383 {
384 if (two_phase_option_given)
386 (errcode(ERRCODE_SYNTAX_ERROR),
387 errmsg("conflicting or redundant options")));
388 two_phase_option_given = true;
389
390 data->two_phase = defGetBoolean(defel);
391 }
392 else if (strcmp(defel->defname, "origin") == 0)
393 {
394 char *origin;
395
396 if (origin_option_given)
398 errcode(ERRCODE_SYNTAX_ERROR),
399 errmsg("conflicting or redundant options"));
400 origin_option_given = true;
401
402 origin = defGetString(defel);
403 if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
404 data->publish_no_origin = true;
405 else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
406 data->publish_no_origin = false;
407 else
409 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
410 errmsg("unrecognized origin value: \"%s\"", origin));
411 }
412 else
413 elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
414 }
415
416 /* Check required options */
417 if (!protocol_version_given)
419 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
420 errmsg("option \"%s\" missing", "proto_version"));
421 if (!publication_names_given)
423 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
424 errmsg("option \"%s\" missing", "publication_names"));
425}
426
427/*
428 * Initialize this plugin
429 */
430static void
432 bool is_init)
433{
435 static bool publication_callback_registered = false;
436
437 /* Create our memory context for private allocations. */
438 data->context = AllocSetContextCreate(ctx->context,
439 "logical replication output context",
441
442 data->cachectx = AllocSetContextCreate(ctx->context,
443 "logical replication cache context",
445
446 data->pubctx = AllocSetContextCreate(ctx->context,
447 "logical replication publication list context",
449
451
452 /* This plugin uses binary protocol. */
454
455 /*
456 * This is replication start and not slot initialization.
457 *
458 * Parse and validate options passed by the client.
459 */
460 if (!is_init)
461 {
462 /* Parse the params and ERROR if we see any we don't recognize */
464
465 /* Check if we support requested protocol */
466 if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
468 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
469 errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
470 data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
471
472 if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
474 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
475 errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
476 data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
477
478 /*
479 * Decide whether to enable streaming. It is disabled by default, in
480 * which case we just update the flag in decoding context. Otherwise
481 * we only allow it with sufficient version of the protocol, and when
482 * the output plugin supports it.
483 */
484 if (data->streaming == LOGICALREP_STREAM_OFF)
485 ctx->streaming = false;
486 else if (data->streaming == LOGICALREP_STREAM_ON &&
487 data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
489 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
490 errmsg("requested proto_version=%d does not support streaming, need %d or higher",
491 data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
492 else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
495 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
496 errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
498 else if (!ctx->streaming)
500 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
501 errmsg("streaming requested, but not supported by output plugin")));
502
503 /*
504 * Here, we just check whether the two-phase option is passed by
505 * plugin and decide whether to enable it at later point of time. It
506 * remains enabled if the previous start-up has done so. But we only
507 * allow the option to be passed in with sufficient version of the
508 * protocol, and when the output plugin supports it.
509 */
510 if (!data->two_phase)
511 ctx->twophase_opt_given = false;
512 else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
514 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
515 errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
516 data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
517 else if (!ctx->twophase)
519 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
520 errmsg("two-phase commit requested, but not supported by output plugin")));
521 else
522 ctx->twophase_opt_given = true;
523
524 /* Init publication state. */
525 data->publications = NIL;
526 publications_valid = false;
527
528 /*
529 * Register callback for pg_publication if we didn't already do that
530 * during some previous call in this process.
531 */
532 if (!publication_callback_registered)
533 {
534 CacheRegisterSyscacheCallback(PUBLICATIONOID,
536 (Datum) 0);
538 (Datum) 0);
539 publication_callback_registered = true;
540 }
541
542 /* Initialize relation schema cache. */
544 }
545 else
546 {
547 /*
548 * Disable the streaming and prepared transactions during the slot
549 * initialization mode.
550 */
551 ctx->streaming = false;
552 ctx->twophase = false;
553 }
554}
555
556/*
557 * BEGIN callback.
558 *
559 * Don't send the BEGIN message here instead postpone it until the first
560 * change. In logical replication, a common scenario is to replicate a set of
561 * tables (instead of all tables) and transactions whose changes were on
562 * the table(s) that are not published will produce empty transactions. These
563 * empty transactions will send BEGIN and COMMIT messages to subscribers,
564 * using bandwidth on something with little/no use for logical replication.
565 */
566static void
568{
570 sizeof(PGOutputTxnData));
571
572 txn->output_plugin_private = txndata;
573}
574
575/*
576 * Send BEGIN.
577 *
578 * This is called while processing the first change of the transaction.
579 */
580static void
582{
583 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
585
586 Assert(txndata);
587 Assert(!txndata->sent_begin_txn);
588
589 OutputPluginPrepareWrite(ctx, !send_replication_origin);
590 logicalrep_write_begin(ctx->out, txn);
591 txndata->sent_begin_txn = true;
592
593 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
594 send_replication_origin);
595
596 OutputPluginWrite(ctx, true);
597}
598
599/*
600 * COMMIT callback
601 */
602static void
604 XLogRecPtr commit_lsn)
605{
607 bool sent_begin_txn;
608
609 Assert(txndata);
610
611 /*
612 * We don't need to send the commit message unless some relevant change
613 * from this transaction has been sent to the downstream.
614 */
615 sent_begin_txn = txndata->sent_begin_txn;
616 OutputPluginUpdateProgress(ctx, !sent_begin_txn);
617 pfree(txndata);
618 txn->output_plugin_private = NULL;
619
620 if (!sent_begin_txn)
621 {
622 elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
623 return;
624 }
625
626 OutputPluginPrepareWrite(ctx, true);
627 logicalrep_write_commit(ctx->out, txn, commit_lsn);
628 OutputPluginWrite(ctx, true);
629}
630
631/*
632 * BEGIN PREPARE callback
633 */
634static void
636{
637 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
638
639 OutputPluginPrepareWrite(ctx, !send_replication_origin);
641
642 send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
643 send_replication_origin);
644
645 OutputPluginWrite(ctx, true);
646}
647
648/*
649 * PREPARE callback
650 */
651static void
653 XLogRecPtr prepare_lsn)
654{
655 OutputPluginUpdateProgress(ctx, false);
656
657 OutputPluginPrepareWrite(ctx, true);
658 logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
659 OutputPluginWrite(ctx, true);
660}
661
662/*
663 * COMMIT PREPARED callback
664 */
665static void
667 XLogRecPtr commit_lsn)
668{
669 OutputPluginUpdateProgress(ctx, false);
670
671 OutputPluginPrepareWrite(ctx, true);
672 logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
673 OutputPluginWrite(ctx, true);
674}
675
676/*
677 * ROLLBACK PREPARED callback
678 */
679static void
681 ReorderBufferTXN *txn,
682 XLogRecPtr prepare_end_lsn,
683 TimestampTz prepare_time)
684{
685 OutputPluginUpdateProgress(ctx, false);
686
687 OutputPluginPrepareWrite(ctx, true);
688 logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
689 prepare_time);
690 OutputPluginWrite(ctx, true);
691}
692
693/*
694 * Write the current schema of the relation and its ancestor (if any) if not
695 * done yet.
696 */
697static void
699 ReorderBufferChange *change,
700 Relation relation, RelationSyncEntry *relentry)
701{
703 bool schema_sent;
706
707 /*
708 * Remember XID of the (sub)transaction for the change. We don't care if
709 * it's top-level transaction or not (we have already sent that XID in
710 * start of the current streaming block).
711 *
712 * If we're not in a streaming block, just use InvalidTransactionId and
713 * the write methods will not include it.
714 */
715 if (data->in_streaming)
716 xid = change->txn->xid;
717
718 if (rbtxn_is_subtxn(change->txn))
719 topxid = rbtxn_get_toptxn(change->txn)->xid;
720 else
721 topxid = xid;
722
723 /*
724 * Do we need to send the schema? We do track streamed transactions
725 * separately, because those may be applied later (and the regular
726 * transactions won't see their effects until then) and in an order that
727 * we don't know at this point.
728 *
729 * XXX There is a scope of optimization here. Currently, we always send
730 * the schema first time in a streaming transaction but we can probably
731 * avoid that by checking 'relentry->schema_sent' flag. However, before
732 * doing that we need to study its impact on the case where we have a mix
733 * of streaming and non-streaming transactions.
734 */
735 if (data->in_streaming)
736 schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
737 else
738 schema_sent = relentry->schema_sent;
739
740 /* Nothing to do if we already sent the schema. */
741 if (schema_sent)
742 return;
743
744 /*
745 * Send the schema. If the changes will be published using an ancestor's
746 * schema, not the relation's own, send that ancestor's schema before
747 * sending relation's own (XXX - maybe sending only the former suffices?).
748 */
749 if (relentry->publish_as_relid != RelationGetRelid(relation))
750 {
751 Relation ancestor = RelationIdGetRelation(relentry->publish_as_relid);
752
753 send_relation_and_attrs(ancestor, xid, ctx, relentry);
754 RelationClose(ancestor);
755 }
756
757 send_relation_and_attrs(relation, xid, ctx, relentry);
758
759 if (data->in_streaming)
760 set_schema_sent_in_streamed_txn(relentry, topxid);
761 else
762 relentry->schema_sent = true;
763}
764
765/*
766 * Sends a relation
767 */
768static void
771 RelationSyncEntry *relentry)
772{
773 TupleDesc desc = RelationGetDescr(relation);
774 Bitmapset *columns = relentry->columns;
775 PublishGencolsType include_gencols_type = relentry->include_gencols_type;
776 int i;
777
778 /*
779 * Write out type info if needed. We do that only for user-created types.
780 * We use FirstGenbkiObjectId as the cutoff, so that we only consider
781 * objects with hand-assigned OIDs to be "built in", not for instance any
782 * function or type defined in the information_schema. This is important
783 * because only hand-assigned OIDs can be expected to remain stable across
784 * major versions.
785 */
786 for (i = 0; i < desc->natts; i++)
787 {
788 Form_pg_attribute att = TupleDescAttr(desc, i);
789
790 if (!logicalrep_should_publish_column(att, columns,
791 include_gencols_type))
792 continue;
793
794 if (att->atttypid < FirstGenbkiObjectId)
795 continue;
796
797 OutputPluginPrepareWrite(ctx, false);
798 logicalrep_write_typ(ctx->out, xid, att->atttypid);
799 OutputPluginWrite(ctx, false);
800 }
801
802 OutputPluginPrepareWrite(ctx, false);
803 logicalrep_write_rel(ctx->out, xid, relation, columns,
804 include_gencols_type);
805 OutputPluginWrite(ctx, false);
806}
807
808/*
809 * Executor state preparation for evaluation of row filter expressions for the
810 * specified relation.
811 */
812static EState *
814{
815 EState *estate;
816 RangeTblEntry *rte;
817 List *perminfos = NIL;
818
819 estate = CreateExecutorState();
820
821 rte = makeNode(RangeTblEntry);
822 rte->rtekind = RTE_RELATION;
823 rte->relid = RelationGetRelid(rel);
824 rte->relkind = rel->rd_rel->relkind;
825 rte->rellockmode = AccessShareLock;
826
827 addRTEPermissionInfo(&perminfos, rte);
828
829 ExecInitRangeTable(estate, list_make1(rte), perminfos,
831
832 estate->es_output_cid = GetCurrentCommandId(false);
833
834 return estate;
835}
836
837/*
838 * Evaluates row filter.
839 *
840 * If the row filter evaluates to NULL, it is taken as false i.e. the change
841 * isn't replicated.
842 */
843static bool
845{
846 Datum ret;
847 bool isnull;
848
849 Assert(state != NULL);
850
851 ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
852
853 elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
854 isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
855 isnull ? "true" : "false");
856
857 if (isnull)
858 return false;
859
860 return DatumGetBool(ret);
861}
862
863/*
864 * Make sure the per-entry memory context exists.
865 */
866static void
868{
869 Relation relation;
870
871 /* The context may already exist, in which case bail out. */
872 if (entry->entry_cxt)
873 return;
874
875 relation = RelationIdGetRelation(entry->publish_as_relid);
876
877 entry->entry_cxt = AllocSetContextCreate(data->cachectx,
878 "entry private context",
880
882 RelationGetRelationName(relation));
883}
884
885/*
886 * Initialize the row filter.
887 */
888static void
890 RelationSyncEntry *entry)
891{
892 ListCell *lc;
893 List *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
894 bool no_filter[] = {false, false, false}; /* One per pubaction */
895 MemoryContext oldctx;
896 int idx;
897 bool has_filter = true;
898 Oid schemaid = get_rel_namespace(entry->publish_as_relid);
899
900 /*
901 * Find if there are any row filters for this relation. If there are, then
902 * prepare the necessary ExprState and cache it in entry->exprstate. To
903 * build an expression state, we need to ensure the following:
904 *
905 * All the given publication-table mappings must be checked.
906 *
907 * Multiple publications might have multiple row filters for this
908 * relation. Since row filter usage depends on the DML operation, there
909 * are multiple lists (one for each operation) to which row filters will
910 * be appended.
911 *
912 * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
913 * expression" so it takes precedence.
914 */
915 foreach(lc, publications)
916 {
917 Publication *pub = lfirst(lc);
918 HeapTuple rftuple = NULL;
919 Datum rfdatum = 0;
920 bool pub_no_filter = true;
921
922 /*
923 * If the publication is FOR ALL TABLES, or the publication includes a
924 * FOR TABLES IN SCHEMA where the table belongs to the referred
925 * schema, then it is treated the same as if there are no row filters
926 * (even if other publications have a row filter).
927 */
928 if (!pub->alltables &&
929 !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
930 ObjectIdGetDatum(schemaid),
931 ObjectIdGetDatum(pub->oid)))
932 {
933 /*
934 * Check for the presence of a row filter in this publication.
935 */
936 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
938 ObjectIdGetDatum(pub->oid));
939
940 if (HeapTupleIsValid(rftuple))
941 {
942 /* Null indicates no filter. */
943 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
944 Anum_pg_publication_rel_prqual,
945 &pub_no_filter);
946 }
947 }
948
949 if (pub_no_filter)
950 {
951 if (rftuple)
952 ReleaseSysCache(rftuple);
953
954 no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
955 no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
956 no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
957
958 /*
959 * Quick exit if all the DML actions are publicized via this
960 * publication.
961 */
962 if (no_filter[PUBACTION_INSERT] &&
963 no_filter[PUBACTION_UPDATE] &&
964 no_filter[PUBACTION_DELETE])
965 {
966 has_filter = false;
967 break;
968 }
969
970 /* No additional work for this publication. Next one. */
971 continue;
972 }
973
974 /* Form the per pubaction row filter lists. */
975 if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
976 rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
977 TextDatumGetCString(rfdatum));
978 if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
979 rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
980 TextDatumGetCString(rfdatum));
981 if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
982 rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
983 TextDatumGetCString(rfdatum));
984
985 ReleaseSysCache(rftuple);
986 } /* loop all subscribed publications */
987
988 /* Clean the row filter */
989 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
990 {
991 if (no_filter[idx])
992 {
993 list_free_deep(rfnodes[idx]);
994 rfnodes[idx] = NIL;
995 }
996 }
997
998 if (has_filter)
999 {
1001
1003
1004 /*
1005 * Now all the filters for all pubactions are known. Combine them when
1006 * their pubactions are the same.
1007 */
1008 oldctx = MemoryContextSwitchTo(entry->entry_cxt);
1009 entry->estate = create_estate_for_relation(relation);
1010 for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
1011 {
1012 List *filters = NIL;
1013 Expr *rfnode;
1014
1015 if (rfnodes[idx] == NIL)
1016 continue;
1017
1018 foreach(lc, rfnodes[idx])
1019 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
1020
1021 /* combine the row filter and cache the ExprState */
1022 rfnode = make_orclause(filters);
1023 entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
1024 } /* for each pubaction */
1025 MemoryContextSwitchTo(oldctx);
1026
1027 RelationClose(relation);
1028 }
1029}
1030
1031/*
1032 * If the table contains a generated column, check for any conflicting
1033 * values of 'publish_generated_columns' parameter in the publications.
1034 */
1035static void
1037 RelationSyncEntry *entry)
1038{
1040 TupleDesc desc = RelationGetDescr(relation);
1041 bool gencolpresent = false;
1042 bool first = true;
1043
1044 /* Check if there is any generated column present. */
1045 for (int i = 0; i < desc->natts; i++)
1046 {
1047 Form_pg_attribute att = TupleDescAttr(desc, i);
1048
1049 if (att->attgenerated)
1050 {
1051 gencolpresent = true;
1052 break;
1053 }
1054 }
1055
1056 /* There are no generated columns to be published. */
1057 if (!gencolpresent)
1058 {
1059 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
1060 return;
1061 }
1062
1063 /*
1064 * There may be a conflicting value for 'publish_generated_columns'
1065 * parameter in the publications.
1066 */
1067 foreach_ptr(Publication, pub, publications)
1068 {
1069 /*
1070 * The column list takes precedence over the
1071 * 'publish_generated_columns' parameter. Those will be checked later,
1072 * see pgoutput_column_list_init.
1073 */
1074 if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
1075 continue;
1076
1077 if (first)
1078 {
1079 entry->include_gencols_type = pub->pubgencols_type;
1080 first = false;
1081 }
1082 else if (entry->include_gencols_type != pub->pubgencols_type)
1083 ereport(ERROR,
1084 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1085 errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
1087 RelationGetRelationName(relation)));
1088 }
1089}
1090
1091/*
1092 * Initialize the column list.
1093 */
1094static void
1096 RelationSyncEntry *entry)
1097{
1098 ListCell *lc;
1099 bool first = true;
1101 bool found_pub_collist = false;
1102 Bitmapset *relcols = NULL;
1103
1105
1106 /*
1107 * Find if there are any column lists for this relation. If there are,
1108 * build a bitmap using the column lists.
1109 *
1110 * Multiple publications might have multiple column lists for this
1111 * relation.
1112 *
1113 * Note that we don't support the case where the column list is different
1114 * for the same table when combining publications. See comments atop
1115 * fetch_table_list. But one can later change the publication so we still
1116 * need to check all the given publication-table mappings and report an
1117 * error if any publications have a different column list.
1118 */
1119 foreach(lc, publications)
1120 {
1121 Publication *pub = lfirst(lc);
1122 Bitmapset *cols = NULL;
1123
1124 /* Retrieve the bitmap of columns for a column list publication. */
1125 found_pub_collist |= check_and_fetch_column_list(pub,
1126 entry->publish_as_relid,
1127 entry->entry_cxt, &cols);
1128
1129 /*
1130 * For non-column list publications — e.g. TABLE (without a column
1131 * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
1132 * of the table (including generated columns when
1133 * 'publish_generated_columns' parameter is true).
1134 */
1135 if (!cols)
1136 {
1137 /*
1138 * Cache the table columns for the first publication with no
1139 * specified column list to detect publication with a different
1140 * column list.
1141 */
1142 if (!relcols && (list_length(publications) > 1))
1143 {
1145
1146 relcols = pub_form_cols_map(relation,
1147 entry->include_gencols_type);
1148 MemoryContextSwitchTo(oldcxt);
1149 }
1150
1151 cols = relcols;
1152 }
1153
1154 if (first)
1155 {
1156 entry->columns = cols;
1157 first = false;
1158 }
1159 else if (!bms_equal(entry->columns, cols))
1160 ereport(ERROR,
1161 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1162 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
1164 RelationGetRelationName(relation)));
1165 } /* loop all subscribed publications */
1166
1167 /*
1168 * If no column list publications exist, columns to be published will be
1169 * computed later according to the 'publish_generated_columns' parameter.
1170 */
1171 if (!found_pub_collist)
1172 entry->columns = NULL;
1173
1174 RelationClose(relation);
1175}
1176
1177/*
1178 * Initialize the slot for storing new and old tuples, and build the map that
1179 * will be used to convert the relation's tuples into the ancestor's format.
1180 */
1181static void
1183 RelationSyncEntry *entry)
1184{
1185 MemoryContext oldctx;
1186 TupleDesc oldtupdesc;
1187 TupleDesc newtupdesc;
1188
1189 oldctx = MemoryContextSwitchTo(data->cachectx);
1190
1191 /*
1192 * Create tuple table slots. Create a copy of the TupleDesc as it needs to
1193 * live as long as the cache remains.
1194 */
1195 oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1196 newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
1197
1198 entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
1199 entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
1200
1201 MemoryContextSwitchTo(oldctx);
1202
1203 /*
1204 * Cache the map that will be used to convert the relation's tuples into
1205 * the ancestor's format, if needed.
1206 */
1207 if (entry->publish_as_relid != RelationGetRelid(relation))
1208 {
1210 TupleDesc indesc = RelationGetDescr(relation);
1211 TupleDesc outdesc = RelationGetDescr(ancestor);
1212
1213 /* Map must live as long as the logical decoding context. */
1214 oldctx = MemoryContextSwitchTo(data->cachectx);
1215
1216 entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
1217
1218 MemoryContextSwitchTo(oldctx);
1219 RelationClose(ancestor);
1220 }
1221}
1222
1223/*
1224 * Change is checked against the row filter if any.
1225 *
1226 * Returns true if the change is to be replicated, else false.
1227 *
1228 * For inserts, evaluate the row filter for new tuple.
1229 * For deletes, evaluate the row filter for old tuple.
1230 * For updates, evaluate the row filter for old and new tuple.
1231 *
1232 * For updates, if both evaluations are true, we allow sending the UPDATE and
1233 * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
1234 * only one of the tuples matches the row filter expression, we transform
1235 * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
1236 * following rules:
1237 *
1238 * Case 1: old-row (no match) new-row (no match) -> (drop change)
1239 * Case 2: old-row (no match) new row (match) -> INSERT
1240 * Case 3: old-row (match) new-row (no match) -> DELETE
1241 * Case 4: old-row (match) new row (match) -> UPDATE
1242 *
1243 * The new action is updated in the action parameter.
1244 *
1245 * The new slot could be updated when transforming the UPDATE into INSERT,
1246 * because the original new tuple might not have column values from the replica
1247 * identity.
1248 *
1249 * Examples:
1250 * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
1251 * Since the old tuple satisfies, the initial table synchronization copied this
1252 * row (or another method was used to guarantee that there is data
1253 * consistency). However, after the UPDATE the new tuple doesn't satisfy the
1254 * row filter, so from a data consistency perspective, that row should be
1255 * removed on the subscriber. The UPDATE should be transformed into a DELETE
1256 * statement and be sent to the subscriber. Keeping this row on the subscriber
1257 * is undesirable because it doesn't reflect what was defined in the row filter
1258 * expression on the publisher. This row on the subscriber would likely not be
1259 * modified by replication again. If someone inserted a new row with the same
1260 * old identifier, replication could stop due to a constraint violation.
1261 *
1262 * Let's say the old tuple doesn't match the row filter but the new tuple does.
1263 * Since the old tuple doesn't satisfy, the initial table synchronization
1264 * probably didn't copy this row. However, after the UPDATE the new tuple does
1265 * satisfy the row filter, so from a data consistency perspective, that row
1266 * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
1267 * statements have no effect (it matches no row -- see
1268 * apply_handle_update_internal()). So, the UPDATE should be transformed into a
1269 * INSERT statement and be sent to the subscriber. However, this might surprise
1270 * someone who expects the data set to satisfy the row filter expression on the
1271 * provider.
1272 */
1273static bool
1275 TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
1277{
1278 TupleDesc desc;
1279 int i;
1280 bool old_matched,
1281 new_matched,
1282 result;
1283 TupleTableSlot *tmp_new_slot;
1284 TupleTableSlot *new_slot = *new_slot_ptr;
1285 ExprContext *ecxt;
1286 ExprState *filter_exprstate;
1287
1288 /*
1289 * We need this map to avoid relying on ReorderBufferChangeType enums
1290 * having specific values.
1291 */
1292 static const int map_changetype_pubaction[] = {
1296 };
1297
1301
1302 Assert(new_slot || old_slot);
1303
1304 /* Get the corresponding row filter */
1305 filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
1306
1307 /* Bail out if there is no row filter */
1308 if (!filter_exprstate)
1309 return true;
1310
1311 elog(DEBUG3, "table \"%s.%s\" has row filter",
1313 RelationGetRelationName(relation));
1314
1316
1317 ecxt = GetPerTupleExprContext(entry->estate);
1318
1319 /*
1320 * For the following occasions where there is only one tuple, we can
1321 * evaluate the row filter for that tuple and return.
1322 *
1323 * For inserts, we only have the new tuple.
1324 *
1325 * For updates, we can have only a new tuple when none of the replica
1326 * identity columns changed and none of those columns have external data
1327 * but we still need to evaluate the row filter for the new tuple as the
1328 * existing values of those columns might not match the filter. Also,
1329 * users can use constant expressions in the row filter, so we anyway need
1330 * to evaluate it for the new tuple.
1331 *
1332 * For deletes, we only have the old tuple.
1333 */
1334 if (!new_slot || !old_slot)
1335 {
1336 ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
1337 result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1338
1339 return result;
1340 }
1341
1342 /*
1343 * Both the old and new tuples must be valid only for updates and need to
1344 * be checked against the row filter.
1345 */
1346 Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
1347
1348 slot_getallattrs(new_slot);
1349 slot_getallattrs(old_slot);
1350
1351 tmp_new_slot = NULL;
1352 desc = RelationGetDescr(relation);
1353
1354 /*
1355 * The new tuple might not have all the replica identity columns, in which
1356 * case it needs to be copied over from the old tuple.
1357 */
1358 for (i = 0; i < desc->natts; i++)
1359 {
1361
1362 /*
1363 * if the column in the new tuple or old tuple is null, nothing to do
1364 */
1365 if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
1366 continue;
1367
1368 /*
1369 * Unchanged toasted replica identity columns are only logged in the
1370 * old tuple. Copy this over to the new tuple. The changed (or WAL
1371 * Logged) toast values are always assembled in memory and set as
1372 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
1373 */
1374 if (att->attlen == -1 &&
1377 {
1378 if (!tmp_new_slot)
1379 {
1380 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
1381 ExecClearTuple(tmp_new_slot);
1382
1383 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
1384 desc->natts * sizeof(Datum));
1385 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
1386 desc->natts * sizeof(bool));
1387 }
1388
1389 tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
1390 tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
1391 }
1392 }
1393
1394 ecxt->ecxt_scantuple = old_slot;
1395 old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1396
1397 if (tmp_new_slot)
1398 {
1399 ExecStoreVirtualTuple(tmp_new_slot);
1400 ecxt->ecxt_scantuple = tmp_new_slot;
1401 }
1402 else
1403 ecxt->ecxt_scantuple = new_slot;
1404
1405 new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
1406
1407 /*
1408 * Case 1: if both tuples don't match the row filter, bailout. Send
1409 * nothing.
1410 */
1411 if (!old_matched && !new_matched)
1412 return false;
1413
1414 /*
1415 * Case 2: if the old tuple doesn't satisfy the row filter but the new
1416 * tuple does, transform the UPDATE into INSERT.
1417 *
1418 * Use the newly transformed tuple that must contain the column values for
1419 * all the replica identity columns. This is required to ensure that the
1420 * while inserting the tuple in the downstream node, we have all the
1421 * required column values.
1422 */
1423 if (!old_matched && new_matched)
1424 {
1426
1427 if (tmp_new_slot)
1428 *new_slot_ptr = tmp_new_slot;
1429 }
1430
1431 /*
1432 * Case 3: if the old tuple satisfies the row filter but the new tuple
1433 * doesn't, transform the UPDATE into DELETE.
1434 *
1435 * This transformation does not require another tuple. The Old tuple will
1436 * be used for DELETE.
1437 */
1438 else if (old_matched && !new_matched)
1440
1441 /*
1442 * Case 4: if both tuples match the row filter, transformation isn't
1443 * required. (*action is default UPDATE).
1444 */
1445
1446 return true;
1447}
1448
1449/*
1450 * Sends the decoded DML over wire.
1451 *
1452 * This is called both in streaming and non-streaming modes.
1453 */
1454static void
1456 Relation relation, ReorderBufferChange *change)
1457{
1460 MemoryContext old;
1461 RelationSyncEntry *relentry;
1463 Relation ancestor = NULL;
1464 Relation targetrel = relation;
1466 TupleTableSlot *old_slot = NULL;
1467 TupleTableSlot *new_slot = NULL;
1468
1469 if (!is_publishable_relation(relation))
1470 return;
1471
1472 /*
1473 * Remember the xid for the change in streaming mode. We need to send xid
1474 * with each change in the streaming mode so that subscriber can make
1475 * their association and on aborts, it can discard the corresponding
1476 * changes.
1477 */
1478 if (data->in_streaming)
1479 xid = change->txn->xid;
1480
1481 relentry = get_rel_sync_entry(data, relation);
1482
1483 /* First check the table filter */
1484 switch (action)
1485 {
1487 if (!relentry->pubactions.pubinsert)
1488 return;
1489 break;
1491 if (!relentry->pubactions.pubupdate)
1492 return;
1493 break;
1495 if (!relentry->pubactions.pubdelete)
1496 return;
1497
1498 /*
1499 * This is only possible if deletes are allowed even when replica
1500 * identity is not defined for a table. Since the DELETE action
1501 * can't be published, we simply return.
1502 */
1503 if (!change->data.tp.oldtuple)
1504 {
1505 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
1506 return;
1507 }
1508 break;
1509 default:
1510 Assert(false);
1511 }
1512
1513 /* Avoid leaking memory by using and resetting our own context */
1514 old = MemoryContextSwitchTo(data->context);
1515
1516 /* Switch relation if publishing via root. */
1517 if (relentry->publish_as_relid != RelationGetRelid(relation))
1518 {
1519 Assert(relation->rd_rel->relispartition);
1520 ancestor = RelationIdGetRelation(relentry->publish_as_relid);
1521 targetrel = ancestor;
1522 }
1523
1524 if (change->data.tp.oldtuple)
1525 {
1526 old_slot = relentry->old_slot;
1527 ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
1528
1529 /* Convert tuple if needed. */
1530 if (relentry->attrmap)
1531 {
1533 &TTSOpsVirtual);
1534
1535 old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
1536 }
1537 }
1538
1539 if (change->data.tp.newtuple)
1540 {
1541 new_slot = relentry->new_slot;
1542 ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
1543
1544 /* Convert tuple if needed. */
1545 if (relentry->attrmap)
1546 {
1548 &TTSOpsVirtual);
1549
1550 new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
1551 }
1552 }
1553
1554 /*
1555 * Check row filter.
1556 *
1557 * Updates could be transformed to inserts or deletes based on the results
1558 * of the row filter for old and new tuple.
1559 */
1560 if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
1561 goto cleanup;
1562
1563 /*
1564 * Send BEGIN if we haven't yet.
1565 *
1566 * We send the BEGIN message after ensuring that we will actually send the
1567 * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
1568 * transactions.
1569 */
1570 if (txndata && !txndata->sent_begin_txn)
1571 pgoutput_send_begin(ctx, txn);
1572
1573 /*
1574 * Schema should be sent using the original relation because it also sends
1575 * the ancestor's relation.
1576 */
1577 maybe_send_schema(ctx, change, relation, relentry);
1578
1579 OutputPluginPrepareWrite(ctx, true);
1580
1581 /* Send the data */
1582 switch (action)
1583 {
1585 logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
1586 data->binary, relentry->columns,
1587 relentry->include_gencols_type);
1588 break;
1590 logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
1591 new_slot, data->binary, relentry->columns,
1592 relentry->include_gencols_type);
1593 break;
1595 logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
1596 data->binary, relentry->columns,
1597 relentry->include_gencols_type);
1598 break;
1599 default:
1600 Assert(false);
1601 }
1602
1603 OutputPluginWrite(ctx, true);
1604
1605cleanup:
1606 if (RelationIsValid(ancestor))
1607 {
1608 RelationClose(ancestor);
1609 ancestor = NULL;
1610 }
1611
1612 /* Drop the new slots that were used to store the converted tuples. */
1613 if (relentry->attrmap)
1614 {
1615 if (old_slot)
1617
1618 if (new_slot)
1620 }
1621
1623 MemoryContextReset(data->context);
1624}
1625
1626static void
1628 int nrelations, Relation relations[], ReorderBufferChange *change)
1629{
1632 MemoryContext old;
1633 RelationSyncEntry *relentry;
1634 int i;
1635 int nrelids;
1636 Oid *relids;
1638
1639 /* Remember the xid for the change in streaming mode. See pgoutput_change. */
1640 if (data->in_streaming)
1641 xid = change->txn->xid;
1642
1643 old = MemoryContextSwitchTo(data->context);
1644
1645 relids = palloc0(nrelations * sizeof(Oid));
1646 nrelids = 0;
1647
1648 for (i = 0; i < nrelations; i++)
1649 {
1650 Relation relation = relations[i];
1651 Oid relid = RelationGetRelid(relation);
1652
1653 if (!is_publishable_relation(relation))
1654 continue;
1655
1656 relentry = get_rel_sync_entry(data, relation);
1657
1658 if (!relentry->pubactions.pubtruncate)
1659 continue;
1660
1661 /*
1662 * Don't send partitions if the publication wants to send only the
1663 * root tables through it.
1664 */
1665 if (relation->rd_rel->relispartition &&
1666 relentry->publish_as_relid != relid)
1667 continue;
1668
1669 relids[nrelids++] = relid;
1670
1671 /* Send BEGIN if we haven't yet */
1672 if (txndata && !txndata->sent_begin_txn)
1673 pgoutput_send_begin(ctx, txn);
1674
1675 maybe_send_schema(ctx, change, relation, relentry);
1676 }
1677
1678 if (nrelids > 0)
1679 {
1680 OutputPluginPrepareWrite(ctx, true);
1682 xid,
1683 nrelids,
1684 relids,
1685 change->data.truncate.cascade,
1686 change->data.truncate.restart_seqs);
1687 OutputPluginWrite(ctx, true);
1688 }
1689
1691 MemoryContextReset(data->context);
1692}
1693
1694static void
1696 XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
1697 const char *message)
1698{
1701
1702 if (!data->messages)
1703 return;
1704
1705 /*
1706 * Remember the xid for the message in streaming mode. See
1707 * pgoutput_change.
1708 */
1709 if (data->in_streaming)
1710 xid = txn->xid;
1711
1712 /*
1713 * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
1714 */
1715 if (transactional)
1716 {
1718
1719 /* Send BEGIN if we haven't yet */
1720 if (txndata && !txndata->sent_begin_txn)
1721 pgoutput_send_begin(ctx, txn);
1722 }
1723
1724 OutputPluginPrepareWrite(ctx, true);
1726 xid,
1727 message_lsn,
1728 transactional,
1729 prefix,
1730 sz,
1731 message);
1732 OutputPluginWrite(ctx, true);
1733}
1734
1735/*
1736 * Return true if the data is associated with an origin and the user has
1737 * requested the changes that don't have an origin, false otherwise.
1738 */
1739static bool
1741 RepOriginId origin_id)
1742{
1744
1745 if (data->publish_no_origin && origin_id != InvalidRepOriginId)
1746 return true;
1747
1748 return false;
1749}
1750
1751/*
1752 * Shutdown the output plugin.
1753 *
1754 * Note, we don't need to clean the data->context, data->cachectx, and
1755 * data->pubctx as they are child contexts of the ctx->context so they
1756 * will be cleaned up by logical decoding machinery.
1757 */
1758static void
1760{
1762 {
1764 RelationSyncCache = NULL;
1765 }
1766}
1767
1768/*
1769 * Load publications from the list of publication names.
1770 *
1771 * Here, we skip the publications that don't exist yet. This will allow us
1772 * to silently continue the replication in the absence of a missing publication.
1773 * This is required because we allow the users to create publications after they
1774 * have specified the required publications at the time of replication start.
1775 */
1776static List *
1778{
1779 List *result = NIL;
1780 ListCell *lc;
1781
1782 foreach(lc, pubnames)
1783 {
1784 char *pubname = (char *) lfirst(lc);
1785 Publication *pub = GetPublicationByName(pubname, true);
1786
1787 if (pub)
1788 result = lappend(result, pub);
1789 else
1791 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1792 errmsg("skipped loading publication: %s", pubname),
1793 errdetail("The publication does not exist at this point in the WAL."),
1794 errhint("Create the publication if it does not exist."));
1795 }
1796
1797 return result;
1798}
1799
1800/*
1801 * Publication syscache invalidation callback.
1802 *
1803 * Called for invalidations on pg_publication.
1804 */
1805static void
1807{
1808 publications_valid = false;
1809}
1810
1811/*
1812 * START STREAM callback
1813 */
1814static void
1816 ReorderBufferTXN *txn)
1817{
1819 bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
1820
1821 /* we can't nest streaming of transactions */
1822 Assert(!data->in_streaming);
1823
1824 /*
1825 * If we already sent the first stream for this transaction then don't
1826 * send the origin id in the subsequent streams.
1827 */
1828 if (rbtxn_is_streamed(txn))
1829 send_replication_origin = false;
1830
1831 OutputPluginPrepareWrite(ctx, !send_replication_origin);
1833
1835 send_replication_origin);
1836
1837 OutputPluginWrite(ctx, true);
1838
1839 /* we're streaming a chunk of transaction now */
1840 data->in_streaming = true;
1841}
1842
1843/*
1844 * STOP STREAM callback
1845 */
1846static void
1848 ReorderBufferTXN *txn)
1849{
1851
1852 /* we should be streaming a transaction */
1853 Assert(data->in_streaming);
1854
1855 OutputPluginPrepareWrite(ctx, true);
1857 OutputPluginWrite(ctx, true);
1858
1859 /* we've stopped streaming a transaction */
1860 data->in_streaming = false;
1861}
1862
1863/*
1864 * Notify downstream to discard the streamed transaction (along with all
1865 * its subtransactions, if it's a toplevel transaction).
1866 */
1867static void
1869 ReorderBufferTXN *txn,
1870 XLogRecPtr abort_lsn)
1871{
1872 ReorderBufferTXN *toptxn;
1874 bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
1875
1876 /*
1877 * The abort should happen outside streaming block, even for streamed
1878 * transactions. The transaction has to be marked as streamed, though.
1879 */
1880 Assert(!data->in_streaming);
1881
1882 /* determine the toplevel transaction */
1883 toptxn = rbtxn_get_toptxn(txn);
1884
1885 Assert(rbtxn_is_streamed(toptxn));
1886
1887 OutputPluginPrepareWrite(ctx, true);
1888 logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
1889 txn->xact_time.abort_time, write_abort_info);
1890
1891 OutputPluginWrite(ctx, true);
1892
1893 cleanup_rel_sync_cache(toptxn->xid, false);
1894}
1895
1896/*
1897 * Notify downstream to apply the streamed transaction (along with all
1898 * its subtransactions).
1899 */
1900static void
1902 ReorderBufferTXN *txn,
1903 XLogRecPtr commit_lsn)
1904{
1906
1907 /*
1908 * The commit should happen outside streaming block, even for streamed
1909 * transactions. The transaction has to be marked as streamed, though.
1910 */
1911 Assert(!data->in_streaming);
1913
1914 OutputPluginUpdateProgress(ctx, false);
1915
1916 OutputPluginPrepareWrite(ctx, true);
1917 logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
1918 OutputPluginWrite(ctx, true);
1919
1920 cleanup_rel_sync_cache(txn->xid, true);
1921}
1922
1923/*
1924 * PREPARE callback (for streaming two-phase commit).
1925 *
1926 * Notify the downstream to prepare the transaction.
1927 */
1928static void
1930 ReorderBufferTXN *txn,
1931 XLogRecPtr prepare_lsn)
1932{
1934
1935 OutputPluginUpdateProgress(ctx, false);
1936 OutputPluginPrepareWrite(ctx, true);
1937 logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1938 OutputPluginWrite(ctx, true);
1939}
1940
1941/*
1942 * Initialize the relation schema sync cache for a decoding session.
1943 *
1944 * The hash table is destroyed at the end of a decoding session. While
1945 * relcache invalidations still exist and will still be invoked, they
1946 * will just see the null hash table global and take no action.
1947 */
1948static void
1950{
1951 HASHCTL ctl;
1952 static bool relation_callbacks_registered = false;
1953
1954 /* Nothing to do if hash table already exists */
1955 if (RelationSyncCache != NULL)
1956 return;
1957
1958 /* Make a new hash table for the cache */
1959 ctl.keysize = sizeof(Oid);
1960 ctl.entrysize = sizeof(RelationSyncEntry);
1961 ctl.hcxt = cachectx;
1962
1963 RelationSyncCache = hash_create("logical replication output relation cache",
1964 128, &ctl,
1966
1967 Assert(RelationSyncCache != NULL);
1968
1969 /* No more to do if we already registered callbacks */
1970 if (relation_callbacks_registered)
1971 return;
1972
1973 /* We must update the cache entry for a relation after a relcache flush */
1975
1976 /*
1977 * Flush all cache entries after a pg_namespace change, in case it was a
1978 * schema rename affecting a relation being replicated.
1979 *
1980 * XXX: It is not a good idea to invalidate all the relation entries in
1981 * RelationSyncCache on schema rename. We can optimize it to invalidate
1982 * only the required relations by either having a specific invalidation
1983 * message containing impacted relations or by having schema information
1984 * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
1985 * passed to the callback.
1986 */
1987 CacheRegisterSyscacheCallback(NAMESPACEOID,
1989 (Datum) 0);
1990
1991 relation_callbacks_registered = true;
1992}
1993
1994/*
1995 * We expect relatively small number of streamed transactions.
1996 */
1997static bool
1999{
2000 return list_member_xid(entry->streamed_txns, xid);
2001}
2002
2003/*
2004 * Add the xid in the rel sync entry for which we have already sent the schema
2005 * of the relation.
2006 */
2007static void
2009{
2010 MemoryContext oldctx;
2011
2013
2014 entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
2015
2016 MemoryContextSwitchTo(oldctx);
2017}
2018
2019/*
2020 * Find or create entry in the relation schema cache.
2021 *
2022 * This looks up publications that the given relation is directly or
2023 * indirectly part of (the latter if it's really the relation's ancestor that
2024 * is part of a publication) and fills up the found entry with the information
2025 * about which operations to publish and whether to use an ancestor's schema
2026 * when publishing.
2027 */
2028static RelationSyncEntry *
2030{
2031 RelationSyncEntry *entry;
2032 bool found;
2033 MemoryContext oldctx;
2034 Oid relid = RelationGetRelid(relation);
2035
2036 Assert(RelationSyncCache != NULL);
2037
2038 /* Find cached relation info, creating if not found */
2040 &relid,
2041 HASH_ENTER, &found);
2042 Assert(entry != NULL);
2043
2044 /* initialize entry, if it's new */
2045 if (!found)
2046 {
2047 entry->replicate_valid = false;
2048 entry->schema_sent = false;
2049 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2050 entry->streamed_txns = NIL;
2051 entry->pubactions.pubinsert = entry->pubactions.pubupdate =
2052 entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
2053 entry->new_slot = NULL;
2054 entry->old_slot = NULL;
2055 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2056 entry->entry_cxt = NULL;
2058 entry->columns = NULL;
2059 entry->attrmap = NULL;
2060 }
2061
2062 /* Validate the entry */
2063 if (!entry->replicate_valid)
2064 {
2065 Oid schemaId = get_rel_namespace(relid);
2066 List *pubids = GetRelationPublications(relid);
2067
2068 /*
2069 * We don't acquire a lock on the namespace system table as we build
2070 * the cache entry using a historic snapshot and all the later changes
2071 * are absorbed while decoding WAL.
2072 */
2073 List *schemaPubids = GetSchemaPublications(schemaId);
2074 ListCell *lc;
2075 Oid publish_as_relid = relid;
2076 int publish_ancestor_level = 0;
2077 bool am_partition = get_rel_relispartition(relid);
2078 char relkind = get_rel_relkind(relid);
2079 List *rel_publications = NIL;
2080
2081 /* Reload publications if needed before use. */
2082 if (!publications_valid)
2083 {
2084 MemoryContextReset(data->pubctx);
2085
2086 oldctx = MemoryContextSwitchTo(data->pubctx);
2087 data->publications = LoadPublications(data->publication_names);
2088 MemoryContextSwitchTo(oldctx);
2089 publications_valid = true;
2090 }
2091
2092 /*
2093 * Reset schema_sent status as the relation definition may have
2094 * changed. Also reset pubactions to empty in case rel was dropped
2095 * from a publication. Also free any objects that depended on the
2096 * earlier definition.
2097 */
2098 entry->schema_sent = false;
2099 entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
2100 list_free(entry->streamed_txns);
2101 entry->streamed_txns = NIL;
2102 bms_free(entry->columns);
2103 entry->columns = NULL;
2104 entry->pubactions.pubinsert = false;
2105 entry->pubactions.pubupdate = false;
2106 entry->pubactions.pubdelete = false;
2107 entry->pubactions.pubtruncate = false;
2108
2109 /*
2110 * Tuple slots cleanups. (Will be rebuilt later if needed).
2111 */
2112 if (entry->old_slot)
2113 {
2114 TupleDesc desc = entry->old_slot->tts_tupleDescriptor;
2115
2116 Assert(desc->tdrefcount == -1);
2117
2119
2120 /*
2121 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2122 * do it now to avoid any leaks.
2123 */
2124 FreeTupleDesc(desc);
2125 }
2126 if (entry->new_slot)
2127 {
2128 TupleDesc desc = entry->new_slot->tts_tupleDescriptor;
2129
2130 Assert(desc->tdrefcount == -1);
2131
2133
2134 /*
2135 * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
2136 * do it now to avoid any leaks.
2137 */
2138 FreeTupleDesc(desc);
2139 }
2140
2141 entry->old_slot = NULL;
2142 entry->new_slot = NULL;
2143
2144 if (entry->attrmap)
2145 free_attrmap(entry->attrmap);
2146 entry->attrmap = NULL;
2147
2148 /*
2149 * Row filter cache cleanups.
2150 */
2151 if (entry->entry_cxt)
2153
2154 entry->entry_cxt = NULL;
2155 entry->estate = NULL;
2156 memset(entry->exprstate, 0, sizeof(entry->exprstate));
2157
2158 /*
2159 * Build publication cache. We can't use one provided by relcache as
2160 * relcache considers all publications that the given relation is in,
2161 * but here we only need to consider ones that the subscriber
2162 * requested.
2163 */
2164 foreach(lc, data->publications)
2165 {
2166 Publication *pub = lfirst(lc);
2167 bool publish = false;
2168
2169 /*
2170 * Under what relid should we publish changes in this publication?
2171 * We'll use the top-most relid across all publications. Also
2172 * track the ancestor level for this publication.
2173 */
2174 Oid pub_relid = relid;
2175 int ancestor_level = 0;
2176
2177 /*
2178 * If this is a FOR ALL TABLES publication, pick the partition
2179 * root and set the ancestor level accordingly.
2180 */
2181 if (pub->alltables)
2182 {
2183 publish = true;
2184 if (pub->pubviaroot && am_partition)
2185 {
2186 List *ancestors = get_partition_ancestors(relid);
2187
2188 pub_relid = llast_oid(ancestors);
2189 ancestor_level = list_length(ancestors);
2190 }
2191 }
2192
2193 if (!publish)
2194 {
2195 bool ancestor_published = false;
2196
2197 /*
2198 * For a partition, check if any of the ancestors are
2199 * published. If so, note down the topmost ancestor that is
2200 * published via this publication, which will be used as the
2201 * relation via which to publish the partition's changes.
2202 */
2203 if (am_partition)
2204 {
2205 Oid ancestor;
2206 int level;
2207 List *ancestors = get_partition_ancestors(relid);
2208
2209 ancestor = GetTopMostAncestorInPublication(pub->oid,
2210 ancestors,
2211 &level);
2212
2213 if (ancestor != InvalidOid)
2214 {
2215 ancestor_published = true;
2216 if (pub->pubviaroot)
2217 {
2218 pub_relid = ancestor;
2219 ancestor_level = level;
2220 }
2221 }
2222 }
2223
2224 if (list_member_oid(pubids, pub->oid) ||
2225 list_member_oid(schemaPubids, pub->oid) ||
2226 ancestor_published)
2227 publish = true;
2228 }
2229
2230 /*
2231 * If the relation is to be published, determine actions to
2232 * publish, and list of columns, if appropriate.
2233 *
2234 * Don't publish changes for partitioned tables, because
2235 * publishing those of its partitions suffices, unless partition
2236 * changes won't be published due to pubviaroot being set.
2237 */
2238 if (publish &&
2239 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
2240 {
2245
2246 /*
2247 * We want to publish the changes as the top-most ancestor
2248 * across all publications. So we need to check if the already
2249 * calculated level is higher than the new one. If yes, we can
2250 * ignore the new value (as it's a child). Otherwise the new
2251 * value is an ancestor, so we keep it.
2252 */
2253 if (publish_ancestor_level > ancestor_level)
2254 continue;
2255
2256 /*
2257 * If we found an ancestor higher up in the tree, discard the
2258 * list of publications through which we replicate it, and use
2259 * the new ancestor.
2260 */
2261 if (publish_ancestor_level < ancestor_level)
2262 {
2263 publish_as_relid = pub_relid;
2264 publish_ancestor_level = ancestor_level;
2265
2266 /* reset the publication list for this relation */
2267 rel_publications = NIL;
2268 }
2269 else
2270 {
2271 /* Same ancestor level, has to be the same OID. */
2272 Assert(publish_as_relid == pub_relid);
2273 }
2274
2275 /* Track publications for this ancestor. */
2276 rel_publications = lappend(rel_publications, pub);
2277 }
2278 }
2279
2280 entry->publish_as_relid = publish_as_relid;
2281
2282 /*
2283 * Initialize the tuple slot, map, and row filter. These are only used
2284 * when publishing inserts, updates, or deletes.
2285 */
2286 if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
2287 entry->pubactions.pubdelete)
2288 {
2289 /* Initialize the tuple slot and map */
2290 init_tuple_slot(data, relation, entry);
2291
2292 /* Initialize the row filter */
2293 pgoutput_row_filter_init(data, rel_publications, entry);
2294
2295 /* Check whether to publish generated columns. */
2296 check_and_init_gencol(data, rel_publications, entry);
2297
2298 /* Initialize the column list */
2299 pgoutput_column_list_init(data, rel_publications, entry);
2300 }
2301
2302 list_free(pubids);
2303 list_free(schemaPubids);
2304 list_free(rel_publications);
2305
2306 entry->replicate_valid = true;
2307 }
2308
2309 return entry;
2310}
2311
2312/*
2313 * Cleanup list of streamed transactions and update the schema_sent flag.
2314 *
2315 * When a streamed transaction commits or aborts, we need to remove the
2316 * toplevel XID from the schema cache. If the transaction aborted, the
2317 * subscriber will simply throw away the schema records we streamed, so
2318 * we don't need to do anything else.
2319 *
2320 * If the transaction is committed, the subscriber will update the relation
2321 * cache - so tweak the schema_sent flag accordingly.
2322 */
2323static void
2325{
2326 HASH_SEQ_STATUS hash_seq;
2327 RelationSyncEntry *entry;
2328
2329 Assert(RelationSyncCache != NULL);
2330
2331 hash_seq_init(&hash_seq, RelationSyncCache);
2332 while ((entry = hash_seq_search(&hash_seq)) != NULL)
2333 {
2334 /*
2335 * We can set the schema_sent flag for an entry that has committed xid
2336 * in the list as that ensures that the subscriber would have the
2337 * corresponding schema and we don't need to send it unless there is
2338 * any invalidation for that relation.
2339 */
2340 foreach_xid(streamed_txn, entry->streamed_txns)
2341 {
2342 if (xid == streamed_txn)
2343 {
2344 if (is_commit)
2345 entry->schema_sent = true;
2346
2347 entry->streamed_txns =
2348 foreach_delete_current(entry->streamed_txns, streamed_txn);
2349 break;
2350 }
2351 }
2352 }
2353}
2354
2355/*
2356 * Relcache invalidation callback
2357 */
2358static void
2360{
2361 RelationSyncEntry *entry;
2362
2363 /*
2364 * We can get here if the plugin was used in SQL interface as the
2365 * RelationSyncCache is destroyed when the decoding finishes, but there is
2366 * no way to unregister the relcache invalidation callback.
2367 */
2368 if (RelationSyncCache == NULL)
2369 return;
2370
2371 /*
2372 * Nobody keeps pointers to entries in this hash table around outside
2373 * logical decoding callback calls - but invalidation events can come in
2374 * *during* a callback if we do any syscache access in the callback.
2375 * Because of that we must mark the cache entry as invalid but not damage
2376 * any of its substructure here. The next get_rel_sync_entry() call will
2377 * rebuild it all.
2378 */
2379 if (OidIsValid(relid))
2380 {
2381 /*
2382 * Getting invalidations for relations that aren't in the table is
2383 * entirely normal. So we don't care if it's found or not.
2384 */
2386 HASH_FIND, NULL);
2387 if (entry != NULL)
2388 entry->replicate_valid = false;
2389 }
2390 else
2391 {
2392 /* Whole cache must be flushed. */
2393 HASH_SEQ_STATUS status;
2394
2396 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2397 {
2398 entry->replicate_valid = false;
2399 }
2400 }
2401}
2402
2403/*
2404 * Publication relation/schema map syscache invalidation callback
2405 *
2406 * Called for invalidations on pg_namespace.
2407 */
2408static void
2410{
2411 HASH_SEQ_STATUS status;
2412 RelationSyncEntry *entry;
2413
2414 /*
2415 * We can get here if the plugin was used in SQL interface as the
2416 * RelationSyncCache is destroyed when the decoding finishes, but there is
2417 * no way to unregister the invalidation callbacks.
2418 */
2419 if (RelationSyncCache == NULL)
2420 return;
2421
2422 /*
2423 * We have no easy way to identify which cache entries this invalidation
2424 * event might have affected, so just mark them all invalid.
2425 */
2427 while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
2428 {
2429 entry->replicate_valid = false;
2430 }
2431}
2432
2433/* Send Replication origin */
2434static void
2436 XLogRecPtr origin_lsn, bool send_origin)
2437{
2438 if (send_origin)
2439 {
2440 char *origin;
2441
2442 /*----------
2443 * XXX: which behaviour do we want here?
2444 *
2445 * Alternatives:
2446 * - don't send origin message if origin name not found
2447 * (that's what we do now)
2448 * - throw error - that will break replication, not good
2449 * - send some special "unknown" origin
2450 *----------
2451 */
2452 if (replorigin_by_oid(origin_id, true, &origin))
2453 {
2454 /* Message boundary */
2455 OutputPluginWrite(ctx, false);
2456 OutputPluginPrepareWrite(ctx, true);
2457
2458 logicalrep_write_origin(ctx->out, origin, origin_lsn);
2459 }
2460 }
2461}
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:262
void free_attrmap(AttrMap *map)
Definition: attmap.c:56
AttrMap * build_attrmap_by_name_if_req(TupleDesc indesc, TupleDesc outdesc, bool missing_ok)
Definition: attmap.c:261
Bitmapset * bms_make_singleton(int x)
Definition: bitmapset.c:216
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
static void cleanup(void)
Definition: bootstrap.c:713
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define PG_UINT32_MAX
Definition: c.h:561
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:224
uint32_t uint32
Definition: c.h:502
uint32 TransactionId
Definition: c.h:623
#define OidIsValid(objectId)
Definition: c.h:746
size_t Size
Definition: c.h:576
int64 TimestampTz
Definition: timestamp.h:39
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:955
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:865
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1420
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
Definition: dynahash.c:352
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1385
int errdetail(const char *fmt,...)
Definition: elog.c:1204
int errhint(const char *fmt,...)
Definition: elog.c:1318
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define DEBUG3
Definition: elog.h:28
#define WARNING
Definition: elog.h:36
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:149
ExprState * ExecPrepareExpr(Expr *node, EState *estate)
Definition: execExpr.c:765
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
TupleTableSlot * ExecStoreVirtualTuple(TupleTableSlot *slot)
Definition: execTuples.c:1741
const TupleTableSlotOps TTSOpsHeapTuple
Definition: execTuples.c:85
TupleTableSlot * MakeTupleTableSlot(TupleDesc tupleDesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1301
TupleTableSlot * ExecStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot, bool shouldFree)
Definition: execTuples.c:1541
void ExecInitRangeTable(EState *estate, List *rangeTable, List *permInfos, Bitmapset *unpruned_relids)
Definition: execUtils.c:774
EState * CreateExecutorState(void)
Definition: execUtils.c:88
#define ResetPerTupleExprContext(estate)
Definition: executor.h:687
#define GetPerTupleExprContext(estate)
Definition: executor.h:678
static Datum ExecEvalExprSwitchContext(ExprState *state, ExprContext *econtext, bool *isNull)
Definition: executor.h:458
Assert(PointerIsAligned(start, uint64))
@ HASH_FIND
Definition: hsearch.h:113
@ HASH_ENTER
Definition: hsearch.h:114
#define HASH_CONTEXT
Definition: hsearch.h:102
#define HASH_ELEM
Definition: hsearch.h:95
#define HASH_BLOBS
Definition: hsearch.h:97
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, Datum arg)
Definition: inval.c:1844
void CacheRegisterRelSyncCallback(RelSyncCallbackFunction func, Datum arg)
Definition: inval.c:1865
void CacheRegisterSyscacheCallback(int cacheid, SyscacheCallbackFunction func, Datum arg)
Definition: inval.c:1802
int i
Definition: isn.c:77
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_xid(List *list, TransactionId datum)
Definition: list.c:393
bool list_member_xid(const List *list, TransactionId datum)
Definition: list.c:742
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
#define AccessShareLock
Definition: lockdefs.h:36
void OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:703
void OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx, bool skipped_xact)
Definition: logical.c:716
void OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
Definition: logical.c:690
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
Definition: logicalproto.h:44
#define LOGICALREP_PROTO_MIN_VERSION_NUM
Definition: logicalproto.h:40
#define LOGICALREP_PROTO_STREAM_VERSION_NUM
Definition: logicalproto.h:42
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
Definition: logicalproto.h:43
#define LOGICALREP_PROTO_MAX_VERSION_NUM
Definition: logicalproto.h:45
bool get_rel_relispartition(Oid relid)
Definition: lsyscache.c:2167
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2143
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:2092
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
Expr * make_orclause(List *orclauses)
Definition: makefuncs.c:743
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:414
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1290
void pfree(void *pointer)
Definition: mcxt.c:2147
void * palloc0(Size size)
Definition: mcxt.c:1970
MemoryContext CacheMemoryContext
Definition: mcxt.c:168
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:485
#define AllocSetContextCreate
Definition: memutils.h:149
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:180
#define ALLOCSET_SMALL_SIZES
Definition: memutils.h:190
#define MemoryContextCopyAndSetIdentifier(cxt, id)
Definition: memutils.h:121
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
#define makeNode(_type_)
Definition: nodes.h:161
bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
Definition: origin.c:470
#define InvalidRepOriginId
Definition: origin.h:33
@ OUTPUT_PLUGIN_BINARY_OUTPUT
Definition: output_plugin.h:19
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
RTEPermissionInfo * addRTEPermissionInfo(List **rteperminfos, RangeTblEntry *rte)
@ RTE_RELATION
Definition: parsenodes.h:1026
List * get_partition_ancestors(Oid relid)
Definition: partition.c:134
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
void * arg
const void * data
#define lfirst(lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition: pg_list.h:391
#define foreach_xid(var, lst)
Definition: pg_list.h:472
#define list_make1(x1)
Definition: pg_list.h:212
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
#define llast_oid(l)
Definition: pg_list.h:200
List * GetRelationPublications(Oid relid)
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
List * GetSchemaPublications(Oid schemaid)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
Bitmapset * pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
bool is_publishable_relation(Relation rel)
static List * LoadPublications(List *pubnames)
Definition: pgoutput.c:1777
static void pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:581
static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:2409
struct RelationSyncEntry RelationSyncEntry
static void pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
Definition: pgoutput.c:867
static void parse_output_parameters(List *options, PGOutputData *data)
Definition: pgoutput.c:289
static void init_tuple_slot(PGOutputData *data, Relation relation, RelationSyncEntry *entry)
Definition: pgoutput.c:1182
static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
Definition: pgoutput.c:844
static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change)
Definition: pgoutput.c:1455
#define NUM_ROWFILTER_PUBACTIONS
Definition: pgoutput.c:106
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:635
struct PGOutputTxnData PGOutputTxnData
static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx, RelationSyncEntry *relentry)
Definition: pgoutput.c:769
static void pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
Definition: pgoutput.c:431
static void pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change)
Definition: pgoutput.c:1627
static void init_rel_sync_cache(MemoryContext cachectx)
Definition: pgoutput.c:1949
RowFilterPubAction
Definition: pgoutput.c:100
@ PUBACTION_INSERT
Definition: pgoutput.c:101
@ PUBACTION_UPDATE
Definition: pgoutput.c:102
@ PUBACTION_DELETE
Definition: pgoutput.c:103
static void rel_sync_cache_relation_cb(Datum arg, Oid relid)
Definition: pgoutput.c:2359
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:652
PG_MODULE_MAGIC_EXT(.name="pgoutput",.version=PG_VERSION)
static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation relation)
Definition: pgoutput.c:2029
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id)
Definition: pgoutput.c:1740
static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin)
Definition: pgoutput.c:2435
static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: pgoutput.c:680
static void pgoutput_shutdown(LogicalDecodingContext *ctx)
Definition: pgoutput.c:1759
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
Definition: pgoutput.c:2324
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn)
Definition: pgoutput.c:1868
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: pgoutput.c:1929
static void maybe_send_schema(LogicalDecodingContext *ctx, ReorderBufferChange *change, Relation relation, RelationSyncEntry *relentry)
Definition: pgoutput.c:698
static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:567
static HTAB * RelationSyncCache
Definition: pgoutput.c:220
static void pgoutput_row_filter_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:889
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:1901
static void check_and_init_gencol(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1036
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:666
static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot, TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry, ReorderBufferChangeType *action)
Definition: pgoutput.c:1274
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:2008
static void pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry)
Definition: pgoutput.c:1095
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1847
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
Definition: pgoutput.c:1815
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
Definition: pgoutput.c:1806
void _PG_output_plugin_init(OutputPluginCallbacks *cb)
Definition: pgoutput.c:260
static void pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: pgoutput.c:1695
static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: pgoutput.c:603
static bool publications_valid
Definition: pgoutput.c:86
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
Definition: pgoutput.c:1998
static EState * create_estate_for_relation(Relation rel)
Definition: pgoutput.c:813
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
static bool DatumGetBool(Datum X)
Definition: postgres.h:95
uintptr_t Datum
Definition: postgres.h:69
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
#define InvalidOid
Definition: postgres_ext.h:35
unsigned int Oid
Definition: postgres_ext.h:30
void logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:78
void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time)
Definition: proto.c:293
void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:403
void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn)
Definition: proto.c:374
void logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:667
void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid, XLogRecPtr abort_lsn, TimestampTz abort_time, bool write_abort_info)
Definition: proto.c:1158
void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message)
Definition: proto.c:640
void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:187
void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
Definition: proto.c:723
void logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:528
void logicalrep_write_truncate(StringInfo out, TransactionId xid, int nrelids, Oid relids[], bool cascade, bool restart_seqs)
Definition: proto.c:583
void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:49
void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:237
void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
Definition: proto.c:1104
void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
Definition: proto.c:353
void logicalrep_write_begin_prepare(StringInfo out, ReorderBufferTXN *txn)
Definition: proto.c:116
bool logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:1279
void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment)
Definition: proto.c:1061
void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, bool binary, Bitmapset *columns, PublishGencolsType include_gencols_type)
Definition: proto.c:450
void logicalrep_write_stream_stop(StringInfo out)
Definition: proto.c:1095
tree ctl
Definition: radixtree.h:1838
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationGetDescr(relation)
Definition: rel.h:542
#define RelationGetRelationName(relation)
Definition: rel.h:550
#define RelationIsValid(relation)
Definition: rel.h:489
#define RelationGetNamespace(relation)
Definition: rel.h:557
Relation RelationIdGetRelation(Oid relationId)
Definition: relcache.c:2082
void RelationClose(Relation relation)
Definition: relcache.c:2204
#define rbtxn_is_streamed(txn)
#define rbtxn_get_toptxn(txn)
#define rbtxn_is_subtxn(txn)
ReorderBufferChangeType
Definition: reorderbuffer.h:51
@ REORDER_BUFFER_CHANGE_INSERT
Definition: reorderbuffer.h:52
@ REORDER_BUFFER_CHANGE_DELETE
Definition: reorderbuffer.h:54
@ REORDER_BUFFER_CHANGE_UPDATE
Definition: reorderbuffer.h:53
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
Definition: attmap.h:35
int16 attlen
Definition: tupdesc.h:71
char * defname
Definition: parsenodes.h:826
Node * arg
Definition: parsenodes.h:827
CommandId es_output_cid
Definition: execnodes.h:680
TupleTableSlot * ecxt_scantuple
Definition: execnodes.h:268
Definition: dynahash.c:220
Definition: pg_list.h:54
MemoryContext context
Definition: logical.h:36
StringInfo out
Definition: logical.h:71
void * output_plugin_private
Definition: logical.h:76
List * output_plugin_options
Definition: logical.h:59
LogicalDecodeStreamChangeCB stream_change_cb
LogicalDecodeMessageCB message_cb
LogicalDecodeStreamTruncateCB stream_truncate_cb
LogicalDecodeStreamMessageCB stream_message_cb
LogicalDecodeFilterByOriginCB filter_by_origin_cb
LogicalDecodeTruncateCB truncate_cb
LogicalDecodeStreamStopCB stream_stop_cb
LogicalDecodeStreamCommitCB stream_commit_cb
LogicalDecodeRollbackPreparedCB rollback_prepared_cb
LogicalDecodeStreamPrepareCB stream_prepare_cb
LogicalDecodeCommitPreparedCB commit_prepared_cb
LogicalDecodeStreamStartCB stream_start_cb
LogicalDecodePrepareCB prepare_cb
LogicalDecodeStartupCB startup_cb
LogicalDecodeCommitCB commit_cb
LogicalDecodeBeginCB begin_cb
LogicalDecodeStreamAbortCB stream_abort_cb
LogicalDecodeBeginPrepareCB begin_prepare_cb
LogicalDecodeChangeCB change_cb
LogicalDecodeShutdownCB shutdown_cb
OutputPluginOutputType output_type
Definition: output_plugin.h:28
bool sent_begin_txn
Definition: pgoutput.c:216
PublicationActions pubactions
RTEKind rtekind
Definition: parsenodes.h:1061
Form_pg_class rd_rel
Definition: rel.h:111
ExprState * exprstate[NUM_ROWFILTER_PUBACTIONS]
Definition: pgoutput.c:155
Bitmapset * columns
Definition: pgoutput.c:181
PublicationActions pubactions
Definition: pgoutput.c:146
TupleTableSlot * old_slot
Definition: pgoutput.c:158
PublishGencolsType include_gencols_type
Definition: pgoutput.c:141
bool replicate_valid
Definition: pgoutput.c:130
MemoryContext entry_cxt
Definition: pgoutput.c:187
EState * estate
Definition: pgoutput.c:156
TupleTableSlot * new_slot
Definition: pgoutput.c:157
List * streamed_txns
Definition: pgoutput.c:142
AttrMap * attrmap
Definition: pgoutput.c:174
struct ReorderBufferChange::@110::@112 truncate
ReorderBufferChangeType action
Definition: reorderbuffer.h:81
union ReorderBufferChange::@110 data
struct ReorderBufferTXN * txn
Definition: reorderbuffer.h:84
struct ReorderBufferChange::@110::@111 tp
RepOriginId origin_id
TimestampTz abort_time
void * output_plugin_private
XLogRecPtr origin_lsn
TransactionId xid
union ReorderBufferTXN::@116 xact_time
Definition: value.h:64
int tdrefcount
Definition: tupdesc.h:140
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:123
bool * tts_isnull
Definition: tuptable.h:127
Datum * tts_values
Definition: tuptable.h:125
Definition: regguts.h:323
char defGetStreamingMode(DefElem *def)
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:600
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:232
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:102
#define InvalidTransactionId
Definition: transam.h:31
#define FirstGenbkiObjectId
Definition: transam.h:195
TupleTableSlot * execute_attr_map_slot(AttrMap *attrMap, TupleTableSlot *in_slot, TupleTableSlot *out_slot)
Definition: tupconvert.c:192
TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc)
Definition: tupdesc.c:333
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:495
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
static CompactAttribute * TupleDescCompactAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:175
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:458
static void slot_getallattrs(TupleTableSlot *slot)
Definition: tuptable.h:372
#define strVal(v)
Definition: value.h:82
#define VARATT_IS_EXTERNAL_ONDISK(PTR)
Definition: varatt.h:290
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3525
const char * name
CommandId GetCurrentCommandId(bool used)
Definition: xact.c:829
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28