PostgreSQL Source Code git master
Loading...
Searching...
No Matches
publicationcmds.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * publicationcmds.c
4 * publication manipulation
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/publicationcmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/htup_details.h"
18#include "access/table.h"
19#include "access/xact.h"
20#include "catalog/catalog.h"
21#include "catalog/indexing.h"
22#include "catalog/namespace.h"
25#include "catalog/pg_database.h"
26#include "catalog/pg_inherits.h"
28#include "catalog/pg_proc.h"
32#include "commands/defrem.h"
35#include "miscadmin.h"
36#include "nodes/nodeFuncs.h"
37#include "parser/parse_clause.h"
41#include "storage/lmgr.h"
42#include "utils/acl.h"
43#include "utils/builtins.h"
44#include "utils/inval.h"
45#include "utils/lsyscache.h"
46#include "utils/rel.h"
47#include "utils/syscache.h"
48#include "utils/varlena.h"
49
50
51/*
52 * Information used to validate the columns in the row filter expression. See
53 * contain_invalid_rfcolumn_walker for details.
54 */
55typedef struct rf_context
56{
57 Bitmapset *bms_replident; /* bitset of replica identity columns */
58 bool pubviaroot; /* true if we are validating the parent
59 * relation's row filter */
60 Oid relid; /* relid of the relation */
61 Oid parentid; /* relid of the parent relation */
63
64static List *OpenTableList(List *tables);
65static void CloseTableList(List *rels);
66static void LockSchemaList(List *schemalist);
67static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
69static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
72static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73static char defGetGeneratedColsOption(DefElem *def);
74
75
76static void
79 bool *publish_given,
80 PublicationActions *pubactions,
85{
86 ListCell *lc;
87
88 *publish_given = false;
91
92 /* defaults */
93 pubactions->pubinsert = true;
94 pubactions->pubupdate = true;
95 pubactions->pubdelete = true;
96 pubactions->pubtruncate = true;
99
100 /* Parse options */
101 foreach(lc, options)
102 {
103 DefElem *defel = (DefElem *) lfirst(lc);
104
105 if (strcmp(defel->defname, "publish") == 0)
106 {
107 char *publish;
109 ListCell *lc2;
110
111 if (*publish_given)
113
114 /*
115 * If publish option was given only the explicitly listed actions
116 * should be published.
117 */
118 pubactions->pubinsert = false;
119 pubactions->pubupdate = false;
120 pubactions->pubdelete = false;
121 pubactions->pubtruncate = false;
122
123 *publish_given = true;
124
125 /*
126 * SplitIdentifierString destructively modifies its input, so make
127 * a copy so we don't modify the memory of the executing statement
128 */
130
134 errmsg("invalid list syntax in parameter \"%s\"",
135 "publish")));
136
137 /* Process the option list. */
138 foreach(lc2, publish_list)
139 {
140 char *publish_opt = (char *) lfirst(lc2);
141
142 if (strcmp(publish_opt, "insert") == 0)
143 pubactions->pubinsert = true;
144 else if (strcmp(publish_opt, "update") == 0)
145 pubactions->pubupdate = true;
146 else if (strcmp(publish_opt, "delete") == 0)
147 pubactions->pubdelete = true;
148 else if (strcmp(publish_opt, "truncate") == 0)
149 pubactions->pubtruncate = true;
150 else
153 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
154 "publish", publish_opt)));
155 }
156 }
157 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
158 {
163 }
164 else if (strcmp(defel->defname, "publish_generated_columns") == 0)
165 {
170 }
171 else
174 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 }
176}
177
178/*
179 * Convert the PublicationObjSpecType list into schema oid list and
180 * PublicationTable list.
181 */
182static void
184 List **rels, List **exceptrels, List **schemas)
185{
186 ListCell *cell;
188
189 if (!pubobjspec_list)
190 return;
191
192 foreach(cell, pubobjspec_list)
193 {
195 List *search_path;
196
197 pubobj = (PublicationObjSpec *) lfirst(cell);
198
199 switch (pubobj->pubobjtype)
200 {
202 pubobj->pubtable->except = true;
203 *exceptrels = lappend(*exceptrels, pubobj->pubtable);
204 break;
206 pubobj->pubtable->except = false;
207 *rels = lappend(*rels, pubobj->pubtable);
208 break;
210 schemaid = get_namespace_oid(pubobj->name, false);
211
212 /* Filter out duplicates if user specifies "sch1, sch1" */
213 *schemas = list_append_unique_oid(*schemas, schemaid);
214 break;
216 search_path = fetch_search_path(false);
217 if (search_path == NIL) /* nothing valid in search_path? */
220 errmsg("no schema has been selected for CURRENT_SCHEMA"));
221
222 schemaid = linitial_oid(search_path);
223 list_free(search_path);
224
225 /* Filter out duplicates if user specifies "sch1, sch1" */
226 *schemas = list_append_unique_oid(*schemas, schemaid);
227 break;
228 default:
229 /* shouldn't happen */
230 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
231 break;
232 }
233 }
234}
235
236/*
237 * Returns true if any of the columns used in the row filter WHERE expression is
238 * not part of REPLICA IDENTITY, false otherwise.
239 */
240static bool
242{
243 if (node == NULL)
244 return false;
245
246 if (IsA(node, Var))
247 {
248 Var *var = (Var *) node;
250
251 /*
252 * If pubviaroot is true, we are validating the row filter of the
253 * parent table, but the bitmap contains the replica identity
254 * information of the child table. So, get the column number of the
255 * child table as parent and child column order could be different.
256 */
257 if (context->pubviaroot)
258 {
259 char *colname = get_attname(context->parentid, attnum, false);
260
261 attnum = get_attnum(context->relid, colname);
262 }
263
265 context->bms_replident))
266 return true;
267 }
268
270 context);
271}
272
273/*
274 * Check if all columns referenced in the filter expression are part of the
275 * REPLICA IDENTITY index or not.
276 *
277 * Returns true if any invalid column is found.
278 */
279bool
281 bool pubviaroot)
282{
284 Oid relid = RelationGetRelid(relation);
285 Oid publish_as_relid = RelationGetRelid(relation);
286 bool result = false;
288 bool rfisnull;
289
290 /*
291 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
292 * allowed in the row filter and we can skip the validation.
293 */
294 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
295 return false;
296
297 /*
298 * For a partition, if pubviaroot is true, find the topmost ancestor that
299 * is published via this publication as we need to use its row filter
300 * expression to filter the partition's changes.
301 *
302 * Note that even though the row filter used is for an ancestor, the
303 * REPLICA IDENTITY used will be for the actual child table.
304 */
305 if (pubviaroot && relation->rd_rel->relispartition)
306 {
307 publish_as_relid
308 = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
309
310 if (!OidIsValid(publish_as_relid))
311 publish_as_relid = relid;
312 }
313
315 ObjectIdGetDatum(publish_as_relid),
316 ObjectIdGetDatum(pubid));
317
319 return false;
320
323 &rfisnull);
324
325 if (!rfisnull)
326 {
327 rf_context context = {0};
328 Node *rfnode;
329 Bitmapset *bms = NULL;
330
331 context.pubviaroot = pubviaroot;
332 context.parentid = publish_as_relid;
333 context.relid = relid;
334
335 /* Remember columns that are part of the REPLICA IDENTITY */
338
339 context.bms_replident = bms;
341 result = contain_invalid_rfcolumn_walker(rfnode, &context);
342 }
343
345
346 return result;
347}
348
349/*
350 * Check for invalid columns in the publication table definition.
351 *
352 * This function evaluates two conditions:
353 *
354 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
355 * by the column list. If any column is missing, *invalid_column_list is set
356 * to true.
357 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
358 * are published, either by being explicitly named in the column list or, if
359 * no column list is specified, by setting the option
360 * publish_generated_columns to stored. If any unpublished
361 * generated column is found, *invalid_gen_col is set to true.
362 *
363 * Returns true if any of the above conditions are not met.
364 */
365bool
366pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
367 bool pubviaroot, char pubgencols_type,
369 bool *invalid_gen_col)
370{
371 Oid relid = RelationGetRelid(relation);
372 Oid publish_as_relid = RelationGetRelid(relation);
374 Bitmapset *columns = NULL;
375 TupleDesc desc = RelationGetDescr(relation);
376 Publication *pub;
377 int x;
378
379 *invalid_column_list = false;
380 *invalid_gen_col = false;
381
382 /*
383 * For a partition, if pubviaroot is true, find the topmost ancestor that
384 * is published via this publication as we need to use its column list for
385 * the changes.
386 *
387 * Note that even though the column list used is for an ancestor, the
388 * REPLICA IDENTITY used will be for the actual child table.
389 */
390 if (pubviaroot && relation->rd_rel->relispartition)
391 {
392 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
393
394 if (!OidIsValid(publish_as_relid))
395 publish_as_relid = relid;
396 }
397
398 /* Fetch the column list */
399 pub = GetPublication(pubid);
400 check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
401
402 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
403 {
404 /* With REPLICA IDENTITY FULL, no column list is allowed. */
405 *invalid_column_list = (columns != NULL);
406
407 /*
408 * As we don't allow a column list with REPLICA IDENTITY FULL, the
409 * publish_generated_columns option must be set to stored if the table
410 * has any stored generated columns.
411 */
412 if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
413 relation->rd_att->constr &&
415 *invalid_gen_col = true;
416
417 /*
418 * Virtual generated columns are currently not supported for logical
419 * replication at all.
420 */
421 if (relation->rd_att->constr &&
423 *invalid_gen_col = true;
424
426 return true;
427 }
428
429 /* Remember columns that are part of the REPLICA IDENTITY */
432
433 /*
434 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
435 * (to handle system columns the usual way), while column list does not
436 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
437 * over the idattrs and check all of them are in the list.
438 */
439 x = -1;
440 while ((x = bms_next_member(idattrs, x)) >= 0)
441 {
444
445 if (columns == NULL)
446 {
447 /*
448 * The publish_generated_columns option must be set to stored if
449 * the REPLICA IDENTITY contains any stored generated column.
450 */
451 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
452 {
453 *invalid_gen_col = true;
454 break;
455 }
456
457 /*
458 * The equivalent setting for virtual generated columns does not
459 * exist yet.
460 */
461 if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
462 {
463 *invalid_gen_col = true;
464 break;
465 }
466
467 /* Skip validating the column list since it is not defined */
468 continue;
469 }
470
471 /*
472 * If pubviaroot is true, we are validating the column list of the
473 * parent table, but the bitmap contains the replica identity
474 * information of the child table. The parent/child attnums may not
475 * match, so translate them to the parent - get the attname from the
476 * child, and look it up in the parent.
477 */
478 if (pubviaroot)
479 {
480 /* attribute name in the child table */
481 char *colname = get_attname(relid, attnum, false);
482
483 /*
484 * Determine the attnum for the attribute name in parent (we are
485 * using the column list defined on the parent).
486 */
487 attnum = get_attnum(publish_as_relid, colname);
488 }
489
490 /* replica identity column, not covered by the column list */
492
494 break;
495 }
496
497 bms_free(columns);
499
501}
502
503/*
504 * Invalidate entries in the RelationSyncCache for relations included in the
505 * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
506 *
507 * If 'puballtables' is true, invalidate all cache entries.
508 */
509void
510InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
511{
512 if (puballtables)
513 {
515 }
516 else
517 {
518 List *relids = NIL;
520
521 /*
522 * For partitioned tables, we must invalidate all partitions and
523 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
524 * a target. However, WAL records for TRUNCATE specify both a root and
525 * its leaves.
526 */
527 relids = GetIncludedPublicationRelations(pubid,
531
532 relids = list_concat_unique_oid(relids, schemarelids);
533
534 /* Invalidate the relsyncache */
535 foreach_oid(relid, relids)
537 }
538
539 return;
540}
541
542/* check_functions_in_node callback */
543static bool
549
550/*
551 * The row filter walker checks if the row filter expression is a "simple
552 * expression".
553 *
554 * It allows only simple or compound expressions such as:
555 * - (Var Op Const)
556 * - (Var Op Var)
557 * - (Var Op Const) AND/OR (Var Op Const)
558 * - etc
559 * (where Var is a column of the table this filter belongs to)
560 *
561 * The simple expression has the following restrictions:
562 * - User-defined operators are not allowed;
563 * - User-defined functions are not allowed;
564 * - User-defined types are not allowed;
565 * - User-defined collations are not allowed;
566 * - Non-immutable built-in functions are not allowed;
567 * - System columns are not allowed.
568 *
569 * NOTES
570 *
571 * We don't allow user-defined functions/operators/types/collations because
572 * (a) if a user drops a user-defined object used in a row filter expression or
573 * if there is any other error while using it, the logical decoding
574 * infrastructure won't be able to recover from such an error even if the
575 * object is recreated again because a historic snapshot is used to evaluate
576 * the row filter;
577 * (b) a user-defined function can be used to access tables that could have
578 * unpleasant results because a historic snapshot is used. That's why only
579 * immutable built-in functions are allowed in row filter expressions.
580 *
581 * We don't allow system columns because currently, we don't have that
582 * information in the tuple passed to downstream. Also, as we don't replicate
583 * those to subscribers, there doesn't seem to be a need for a filter on those
584 * columns.
585 *
586 * We can allow other node types after more analysis and testing.
587 */
588static bool
590{
591 char *errdetail_msg = NULL;
592
593 if (node == NULL)
594 return false;
595
596 switch (nodeTag(node))
597 {
598 case T_Var:
599 /* System columns are not allowed. */
600 if (((Var *) node)->varattno < InvalidAttrNumber)
601 errdetail_msg = _("System columns are not allowed.");
602 break;
603 case T_OpExpr:
604 case T_DistinctExpr:
605 case T_NullIfExpr:
606 /* OK, except user-defined operators are not allowed. */
607 if (((OpExpr *) node)->opno >= FirstNormalObjectId)
608 errdetail_msg = _("User-defined operators are not allowed.");
609 break;
611 /* OK, except user-defined operators are not allowed. */
612 if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
613 errdetail_msg = _("User-defined operators are not allowed.");
614
615 /*
616 * We don't need to check the hashfuncid and negfuncid of
617 * ScalarArrayOpExpr as those functions are only built for a
618 * subquery.
619 */
620 break;
621 case T_RowCompareExpr:
622 {
623 ListCell *opid;
624
625 /* OK, except user-defined operators are not allowed. */
626 foreach(opid, ((RowCompareExpr *) node)->opnos)
627 {
629 {
630 errdetail_msg = _("User-defined operators are not allowed.");
631 break;
632 }
633 }
634 }
635 break;
636 case T_Const:
637 case T_FuncExpr:
638 case T_BoolExpr:
639 case T_RelabelType:
640 case T_CollateExpr:
641 case T_CaseExpr:
642 case T_CaseTestExpr:
643 case T_ArrayExpr:
644 case T_RowExpr:
645 case T_CoalesceExpr:
646 case T_MinMaxExpr:
647 case T_XmlExpr:
648 case T_NullTest:
649 case T_BooleanTest:
650 case T_List:
651 /* OK, supported */
652 break;
653 default:
654 errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
655 break;
656 }
657
658 /*
659 * For all the supported nodes, if we haven't already found a problem,
660 * check the types, functions, and collations used in it. We check List
661 * by walking through each element.
662 */
663 if (!errdetail_msg && !IsA(node, List))
664 {
665 if (exprType(node) >= FirstNormalObjectId)
666 errdetail_msg = _("User-defined types are not allowed.");
668 pstate))
669 errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
670 else if (exprCollation(node) >= FirstNormalObjectId ||
672 errdetail_msg = _("User-defined collations are not allowed.");
673 }
674
675 /*
676 * If we found a problem in this node, throw error now. Otherwise keep
677 * going.
678 */
679 if (errdetail_msg)
682 errmsg("invalid publication WHERE expression"),
684 parser_errposition(pstate, exprLocation(node))));
685
687 pstate);
688}
689
690/*
691 * Check if the row filter expression is a "simple expression".
692 *
693 * See check_simple_rowfilter_expr_walker for details.
694 */
695static bool
697{
698 return check_simple_rowfilter_expr_walker(node, pstate);
699}
700
701/*
702 * Transform the publication WHERE expression for all the relations in the list,
703 * ensuring it is coerced to boolean and necessary collation information is
704 * added if required, and add a new nsitem/RTE for the associated relation to
705 * the ParseState's namespace list.
706 *
707 * Also check the publication row filter expression and throw an error if
708 * anything not permitted or unexpected is encountered.
709 */
710static void
711TransformPubWhereClauses(List *tables, const char *queryString,
712 bool pubviaroot)
713{
714 ListCell *lc;
715
716 foreach(lc, tables)
717 {
720 ParseState *pstate;
722
723 if (pri->whereClause == NULL)
724 continue;
725
726 /*
727 * If the publication doesn't publish changes via the root partitioned
728 * table, the partition's row filter will be used. So disallow using
729 * WHERE clause on partitioned table in this case.
730 */
731 if (!pubviaroot &&
732 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
735 errmsg("cannot use publication WHERE clause for relation \"%s\"",
736 RelationGetRelationName(pri->relation)),
737 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
738 "publish_via_partition_root")));
739
740 /*
741 * A fresh pstate is required so that we only have "this" table in its
742 * rangetable
743 */
744 pstate = make_parsestate(NULL);
745 pstate->p_sourcetext = queryString;
746 nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
748 false, false);
749 addNSItemToQuery(pstate, nsitem, false, true, true);
750
752 copyObject(pri->whereClause),
754 "PUBLICATION WHERE");
755
756 /* Fix up collation information */
758
760
761 /*
762 * We allow only simple expressions in row filters. See
763 * check_simple_rowfilter_expr_walker.
764 */
766
767 free_parsestate(pstate);
768
769 pri->whereClause = whereclause;
770 }
771}
772
773
774/*
775 * Given a list of tables that are going to be added to a publication,
776 * verify that they fulfill the necessary preconditions, namely: no tables
777 * have a column list if any schema is published; and partitioned tables do
778 * not have column lists if publish_via_partition_root is not set.
779 *
780 * 'publish_schema' indicates that the publication contains any TABLES IN
781 * SCHEMA elements (newly added in this command, or preexisting).
782 * 'pubviaroot' is the value of publish_via_partition_root.
783 */
784static void
785CheckPubRelationColumnList(char *pubname, List *tables,
786 bool publish_schema, bool pubviaroot)
787{
788 ListCell *lc;
789
790 foreach(lc, tables)
791 {
793
794 if (pri->columns == NIL)
795 continue;
796
797 /*
798 * Disallow specifying column list if any schema is in the
799 * publication.
800 *
801 * XXX We could instead just forbid the case when the publication
802 * tries to publish the table with a column list and a schema for that
803 * table. However, if we do that then we need a restriction during
804 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
805 * seem to be a good idea.
806 */
807 if (publish_schema)
810 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
812 RelationGetRelationName(pri->relation), pubname),
813 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
814
815 /*
816 * If the publication doesn't publish changes via the root partitioned
817 * table, the partition's column list will be used. So disallow using
818 * a column list on the partitioned table in this case.
819 */
820 if (!pubviaroot &&
821 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
824 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
826 RelationGetRelationName(pri->relation), pubname),
827 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
828 "publish_via_partition_root")));
829 }
830}
831
832/*
833 * Create new publication.
834 */
837{
838 Relation rel;
840 Oid puboid;
841 bool nulls[Natts_pg_publication];
844 bool publish_given;
845 PublicationActions pubactions;
851 List *relations = NIL;
854
855 /* must have CREATE privilege on database */
857 if (aclresult != ACLCHECK_OK)
860
861 /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
862 if (!superuser())
863 {
864 if (stmt->for_all_tables || stmt->for_all_sequences)
867 errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
868 }
869
871
872 /* Check if name is used */
874 CStringGetDatum(stmt->pubname));
875 if (OidIsValid(puboid))
878 errmsg("publication \"%s\" already exists",
879 stmt->pubname)));
880
881 /* Form a tuple. */
882 memset(values, 0, sizeof(values));
883 memset(nulls, false, sizeof(nulls));
884
888
890 stmt->options,
891 &publish_given, &pubactions,
896
897 if (stmt->for_all_sequences &&
902 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
903
908 BoolGetDatum(stmt->for_all_tables);
910 BoolGetDatum(stmt->for_all_sequences);
912 BoolGetDatum(pubactions.pubinsert);
914 BoolGetDatum(pubactions.pubupdate);
916 BoolGetDatum(pubactions.pubdelete);
918 BoolGetDatum(pubactions.pubtruncate);
923
925
926 /* Insert tuple into catalog. */
929
931
933
934 /* Make the changes visible. */
936
937 /* Associate objects with the publication. */
938 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
940
941 if (stmt->for_all_tables)
942 {
943 /* Process EXCEPT table list */
944 if (exceptrelations != NIL)
945 {
946 List *rels;
947
949 PublicationAddTables(puboid, rels, true, NULL);
950 CloseTableList(rels);
951 }
952
953 /*
954 * Invalidate relcache so that publication info is rebuilt. Sequences
955 * publication doesn't require invalidation, as replica identity
956 * checks don't apply to them.
957 */
959 }
960 else if (!stmt->for_all_sequences)
961 {
962 /* FOR TABLES IN SCHEMA requires superuser */
963 if (schemaidlist != NIL && !superuser())
966 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
967
968 if (relations != NIL)
969 {
970 List *rels;
971
972 rels = OpenTableList(relations);
975
976 CheckPubRelationColumnList(stmt->pubname, rels,
977 schemaidlist != NIL,
979
980 PublicationAddTables(puboid, rels, true, NULL);
981 CloseTableList(rels);
982 }
983
984 if (schemaidlist != NIL)
985 {
986 /*
987 * Schema lock is held until the publication is created to prevent
988 * concurrent schema deletion.
989 */
992 }
993 }
994
996
998
999 /*
1000 * We don't need this warning message when wal_level >= 'replica' since
1001 * logical decoding is automatically enabled up on a logical slot
1002 * creation.
1003 */
1007 errmsg("logical decoding must be enabled to publish logical changes"),
1008 errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
1009
1010 return myself;
1011}
1012
1013/*
1014 * Change options of a publication.
1015 */
1016static void
1018 Relation rel, HeapTuple tup)
1019{
1020 bool nulls[Natts_pg_publication];
1023 bool publish_given;
1024 PublicationActions pubactions;
1029 ObjectAddress obj;
1031 List *root_relids = NIL;
1032 ListCell *lc;
1033
1035
1037 stmt->options,
1038 &publish_given, &pubactions,
1043
1044 if (pubform->puballsequences &&
1049 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1050
1051 /*
1052 * If the publication doesn't publish changes via the root partitioned
1053 * table, the partition's row filter and column list will be used. So
1054 * disallow using WHERE clause and column lists on partitioned table in
1055 * this case.
1056 */
1057 if (!pubform->puballtables && publish_via_partition_root_given &&
1059 {
1060 /*
1061 * Lock the publication so nobody else can do anything with it. This
1062 * prevents concurrent alter to add partitioned table(s) with WHERE
1063 * clause(s) and/or column lists which we don't allow when not
1064 * publishing via root.
1065 */
1068
1071
1072 foreach(lc, root_relids)
1073 {
1074 Oid relid = lfirst_oid(lc);
1076 char relkind;
1077 char *relname;
1078 bool has_rowfilter;
1079 bool has_collist;
1080
1081 /*
1082 * Beware: we don't have lock on the relations, so cope silently
1083 * with the cache lookups returning NULL.
1084 */
1085
1087 ObjectIdGetDatum(relid),
1090 continue;
1093 if (!has_rowfilter && !has_collist)
1094 {
1096 continue;
1097 }
1098
1099 relkind = get_rel_relkind(relid);
1100 if (relkind != RELKIND_PARTITIONED_TABLE)
1101 {
1103 continue;
1104 }
1105 relname = get_rel_name(relid);
1106 if (relname == NULL) /* table concurrently dropped */
1107 {
1109 continue;
1110 }
1111
1112 if (has_rowfilter)
1113 ereport(ERROR,
1115 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1116 "publish_via_partition_root",
1117 stmt->pubname),
1118 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1119 relname, "publish_via_partition_root")));
1121 ereport(ERROR,
1123 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1124 "publish_via_partition_root",
1125 stmt->pubname),
1126 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1127 relname, "publish_via_partition_root")));
1128 }
1129 }
1130
1131 /* Everything ok, form a new tuple. */
1132 memset(values, 0, sizeof(values));
1133 memset(nulls, false, sizeof(nulls));
1134 memset(replaces, false, sizeof(replaces));
1135
1136 if (publish_given)
1137 {
1140
1143
1146
1149 }
1150
1152 {
1155 }
1156
1158 {
1161 }
1162
1164 replaces);
1165
1166 /* Update the catalog. */
1167 CatalogTupleUpdate(rel, &tup->t_self, tup);
1168
1170
1172
1173 /* Invalidate the relcache. */
1174 if (pubform->puballtables)
1175 {
1177 }
1178 else
1179 {
1180 List *relids = NIL;
1182
1183 /*
1184 * For any partitioned tables contained in the publication, we must
1185 * invalidate all partitions contained in the respective partition
1186 * trees, not just those explicitly mentioned in the publication.
1187 */
1188 if (root_relids == NIL)
1191 else
1192 {
1193 /*
1194 * We already got tables explicitly mentioned in the publication.
1195 * Now get all partitions for the partitioned table in the list.
1196 */
1197 foreach(lc, root_relids)
1198 relids = GetPubPartitionOptionRelations(relids,
1200 lfirst_oid(lc));
1201 }
1202
1205 relids = list_concat_unique_oid(relids, schemarelids);
1206
1208 }
1209
1212 (Node *) stmt);
1213
1215}
1216
1217/*
1218 * Invalidate the relations.
1219 */
1220void
1222{
1223 /*
1224 * We don't want to send too many individual messages, at some point it's
1225 * cheaper to just reset whole relcache.
1226 */
1228 {
1229 ListCell *lc;
1230
1231 foreach(lc, relids)
1233 }
1234 else
1236}
1237
1238/*
1239 * Add or remove table to/from publication.
1240 */
1241static void
1243 List *tables, const char *queryString,
1244 bool publish_schema)
1245{
1246 List *rels = NIL;
1248 Oid pubid = pubform->oid;
1249
1250 /*
1251 * Nothing to do if no objects, except in SET: for that it is quite
1252 * possible that user has not specified any tables in which case we need
1253 * to remove all the existing tables.
1254 */
1255 if (!tables && stmt->action != AP_SetObjects)
1256 return;
1257
1258 rels = OpenTableList(tables);
1259
1260 if (stmt->action == AP_AddObjects)
1261 {
1262 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1263
1265
1267 pubform->pubviaroot);
1268
1269 PublicationAddTables(pubid, rels, false, stmt);
1270 }
1271 else if (stmt->action == AP_DropObjects)
1272 PublicationDropTables(pubid, rels, false);
1273 else /* AP_SetObjects */
1274 {
1275 List *oldrelids = NIL;
1276 List *delrels = NIL;
1277 ListCell *oldlc;
1278
1279 if (stmt->for_all_tables || stmt->for_all_sequences)
1280 {
1281 /*
1282 * In FOR ALL TABLES mode, relations are tracked as exclusions
1283 * (EXCEPT TABLES). Fetch the current excluded relations so they
1284 * can be reconciled with the specified EXCEPT list.
1285 *
1286 * This applies only if the existing publication is already
1287 * defined as FOR ALL TABLES; otherwise, there are no exclusion
1288 * entries to process.
1289 */
1290 if (pubform->puballtables)
1291 {
1294 }
1295 }
1296 else
1297 {
1300
1301 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1302
1304 pubform->pubviaroot);
1305 }
1306
1307 /*
1308 * To recreate the relation list for the publication, look for
1309 * existing relations that do not need to be dropped.
1310 */
1311 foreach(oldlc, oldrelids)
1312 {
1314 ListCell *newlc;
1316 bool found = false;
1320
1321 /* look up the cache for the old relmap */
1324 ObjectIdGetDatum(pubid));
1325
1326 /*
1327 * See if the existing relation currently has a WHERE clause or a
1328 * column list. We need to compare those too.
1329 */
1331 {
1332 bool isnull = true;
1335
1336 /* Load the WHERE clause for this table. */
1339 &isnull);
1340 if (!isnull)
1342
1343 /* Transform the int2vector column list to a bitmap. */
1346 &isnull);
1347
1348 if (!isnull)
1350
1352 }
1353
1354 foreach(newlc, rels)
1355 {
1357 Oid newrelid;
1359
1361 newrelid = RelationGetRelid(newpubrel->relation);
1362
1363 /*
1364 * Validate the column list. If the column list or WHERE
1365 * clause changes, then the validation done here will be
1366 * duplicated inside PublicationAddTables(). The validation
1367 * is cheap enough that that seems harmless.
1368 */
1370 newpubrel->columns);
1371
1372 /*
1373 * Check if any of the new set of relations matches with the
1374 * existing relations in the publication. Additionally, if the
1375 * relation has an associated WHERE clause, check the WHERE
1376 * expressions also match. Same for the column list. Drop the
1377 * rest.
1378 */
1379 if (newrelid == oldrelid)
1380 {
1381 if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1383 {
1384 found = true;
1385 break;
1386 }
1387 }
1388 }
1389
1390 /*
1391 * Add the non-matched relations to a list so that they can be
1392 * dropped.
1393 */
1394 if (!found)
1395 {
1397 oldrel->whereClause = NULL;
1398 oldrel->columns = NIL;
1399 oldrel->except = false;
1400 oldrel->relation = table_open(oldrelid,
1403 }
1404 }
1405
1406 /* And drop them. */
1407 PublicationDropTables(pubid, delrels, true);
1408
1409 /*
1410 * Don't bother calculating the difference for adding, we'll catch and
1411 * skip existing ones when doing catalog update.
1412 */
1413 PublicationAddTables(pubid, rels, true, stmt);
1414
1416 }
1417
1418 CloseTableList(rels);
1419}
1420
1421/*
1422 * Alter the publication schemas.
1423 *
1424 * Add or remove schemas to/from publication.
1425 */
1426static void
1429{
1431
1432 /*
1433 * Nothing to do if no objects, except in SET: for that it is quite
1434 * possible that user has not specified any schemas in which case we need
1435 * to remove all the existing schemas.
1436 */
1437 if (!schemaidlist && stmt->action != AP_SetObjects)
1438 return;
1439
1440 /*
1441 * Schema lock is held until the publication is altered to prevent
1442 * concurrent schema deletion.
1443 */
1445 if (stmt->action == AP_AddObjects)
1446 {
1447 ListCell *lc;
1448 List *reloids;
1449
1452
1453 foreach(lc, reloids)
1454 {
1456
1460
1462 continue;
1463
1464 /*
1465 * Disallow adding schema if column list is already part of the
1466 * publication. See CheckPubRelationColumnList.
1467 */
1469 ereport(ERROR,
1471 errmsg("cannot add schema to publication \"%s\"",
1472 stmt->pubname),
1473 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1474
1476 }
1477
1479 }
1480 else if (stmt->action == AP_DropObjects)
1482 else /* AP_SetObjects */
1483 {
1485 List *delschemas = NIL;
1486
1487 /* Identify which schemas should be dropped */
1489
1490 /*
1491 * Schema lock is held until the publication is altered to prevent
1492 * concurrent schema deletion.
1493 */
1495
1496 /* And drop them */
1498
1499 /*
1500 * Don't bother calculating the difference for adding, we'll catch and
1501 * skip existing ones when doing catalog update.
1502 */
1504 }
1505}
1506
1507/*
1508 * Check if relations and schemas can be in a given publication and throw
1509 * appropriate error if not.
1510 */
1511static void
1513 List *tables, List *schemaidlist)
1514{
1516
1517 if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1518 schemaidlist && !superuser())
1519 ereport(ERROR,
1521 errmsg("must be superuser to add or set schemas")));
1522
1523 if (stmt->for_all_tables && !superuser())
1524 ereport(ERROR,
1526 errmsg("must be superuser to set ALL TABLES"));
1527
1528 if (stmt->for_all_sequences && !superuser())
1529 ereport(ERROR,
1531 errmsg("must be superuser to set ALL SEQUENCES"));
1532
1533 /*
1534 * Check that user is allowed to manipulate the publication tables in
1535 * schema
1536 */
1537 if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
1538 {
1539 if (pubform->puballtables && pubform->puballsequences)
1540 ereport(ERROR,
1542 errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1543 NameStr(pubform->pubname)),
1544 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1545 else if (pubform->puballtables)
1546 ereport(ERROR,
1548 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1549 NameStr(pubform->pubname)),
1550 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1551 else
1552 ereport(ERROR,
1554 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1555 NameStr(pubform->pubname)),
1556 errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1557 }
1558
1559 /* Check that user is allowed to manipulate the publication tables. */
1560 if (tables && (pubform->puballtables || pubform->puballsequences))
1561 {
1562 if (pubform->puballtables && pubform->puballsequences)
1563 ereport(ERROR,
1565 errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1566 NameStr(pubform->pubname)),
1567 errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1568 else if (pubform->puballtables)
1569 ereport(ERROR,
1571 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1572 NameStr(pubform->pubname)),
1573 errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1574 else
1575 ereport(ERROR,
1577 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1578 NameStr(pubform->pubname)),
1579 errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1580 }
1581
1582 if (stmt->for_all_tables || stmt->for_all_sequences)
1583 {
1584 /*
1585 * If the publication already contains specific tables or schemas, we
1586 * prevent switching to a ALL state.
1587 */
1588 if (is_table_publication(pubform->oid) ||
1590 {
1591 ereport(ERROR,
1593 stmt->for_all_tables ?
1594 errmsg("publication \"%s\" does not support ALL TABLES operations", NameStr(pubform->pubname)) :
1595 errmsg("publication \"%s\" does not support ALL SEQUENCES operations", NameStr(pubform->pubname)),
1596 errdetail("This operation requires the publication to be defined as FOR ALL TABLES/SEQUENCES or to be empty."));
1597 }
1598 }
1599}
1600
1601/*
1602 * Update FOR ALL TABLES / FOR ALL SEQUENCES flags of a publication.
1603 */
1604static void
1606 HeapTuple tup)
1607{
1609 bool nulls[Natts_pg_publication] = {0};
1610 bool replaces[Natts_pg_publication] = {0};
1612 bool dirty = false;
1613
1614 if (!stmt->for_all_tables && !stmt->for_all_sequences)
1615 return;
1616
1618
1619 /* Update FOR ALL TABLES flag if changed */
1620 if (stmt->for_all_tables != pubform->puballtables)
1621 {
1623 BoolGetDatum(stmt->for_all_tables);
1625 dirty = true;
1626 }
1627
1628 /* Update FOR ALL SEQUENCES flag if changed */
1629 if (stmt->for_all_sequences != pubform->puballsequences)
1630 {
1632 BoolGetDatum(stmt->for_all_sequences);
1634 dirty = true;
1635 }
1636
1637 if (dirty)
1638 {
1640 nulls, replaces);
1641 CatalogTupleUpdate(rel, &tup->t_self, tup);
1643
1644 /* For ALL TABLES, we must invalidate all relcache entries */
1647 }
1648}
1649
1650/*
1651 * Alter the existing publication.
1652 *
1653 * This is dispatcher function for AlterPublicationOptions,
1654 * AlterPublicationSchemas and AlterPublicationTables.
1655 */
1656void
1658{
1659 Relation rel;
1660 HeapTuple tup;
1662
1664
1666 CStringGetDatum(stmt->pubname));
1667
1668 if (!HeapTupleIsValid(tup))
1669 ereport(ERROR,
1671 errmsg("publication \"%s\" does not exist",
1672 stmt->pubname)));
1673
1675
1676 /* must be owner */
1679 stmt->pubname);
1680
1681 if (stmt->options)
1682 AlterPublicationOptions(pstate, stmt, rel, tup);
1683 else
1684 {
1685 List *relations = NIL;
1688 Oid pubid = pubform->oid;
1689
1690 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1692
1694
1696
1697 /* Lock the publication so nobody else can do anything with it. */
1700
1701 /*
1702 * It is possible that by the time we acquire the lock on publication,
1703 * concurrent DDL has removed it. We can test this by checking the
1704 * existence of publication. We get the tuple again to avoid the risk
1705 * of any publication option getting changed.
1706 */
1708 if (!HeapTupleIsValid(tup))
1709 ereport(ERROR,
1711 errmsg("publication \"%s\" does not exist",
1712 stmt->pubname));
1713
1714 relations = list_concat(relations, exceptrelations);
1715 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1716 schemaidlist != NIL);
1719 }
1720
1721 /* Cleanup. */
1724}
1725
1726/*
1727 * Remove relation from publication by mapping OID.
1728 */
1729void
1731{
1732 Relation rel;
1733 HeapTuple tup;
1735 List *relids = NIL;
1736
1738
1740
1741 if (!HeapTupleIsValid(tup))
1742 elog(ERROR, "cache lookup failed for publication table %u",
1743 proid);
1744
1746
1747 /*
1748 * Invalidate relcache so that publication info is rebuilt.
1749 *
1750 * For the partitioned tables, we must invalidate all partitions contained
1751 * in the respective partition hierarchies, not just the one explicitly
1752 * mentioned in the publication. This is required because we implicitly
1753 * publish the child tables when the parent table is published.
1754 */
1756 pubrel->prrelid);
1757
1759
1760 CatalogTupleDelete(rel, &tup->t_self);
1761
1763
1765}
1766
1767/*
1768 * Remove the publication by mapping OID.
1769 */
1770void
1772{
1773 Relation rel;
1774 HeapTuple tup;
1776
1778
1780 if (!HeapTupleIsValid(tup))
1781 elog(ERROR, "cache lookup failed for publication %u", pubid);
1782
1784
1785 /* Invalidate relcache so that publication info is rebuilt. */
1786 if (pubform->puballtables)
1788
1789 CatalogTupleDelete(rel, &tup->t_self);
1790
1792
1794}
1795
1796/*
1797 * Remove schema from publication by mapping OID.
1798 */
1799void
1801{
1802 Relation rel;
1803 HeapTuple tup;
1804 List *schemaRels = NIL;
1806
1808
1810
1811 if (!HeapTupleIsValid(tup))
1812 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1813
1815
1816 /*
1817 * Invalidate relcache so that publication info is rebuilt. See
1818 * RemovePublicationRelById for why we need to consider all the
1819 * partitions.
1820 */
1824
1825 CatalogTupleDelete(rel, &tup->t_self);
1826
1828
1830}
1831
1832/*
1833 * Open relations specified by a PublicationTable list.
1834 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1835 * add them to a publication.
1836 */
1837static List *
1839{
1840 List *relids = NIL;
1841 List *rels = NIL;
1842 ListCell *lc;
1845
1846 /*
1847 * Open, share-lock, and check all the explicitly-specified relations
1848 */
1849 foreach(lc, tables)
1850 {
1852 bool recurse = t->relation->inh;
1853 Relation rel;
1854 Oid myrelid;
1856
1857 /* Allow query cancel in case this takes a long time */
1859
1862
1863 /*
1864 * Filter out duplicates if user specifies "foo, foo".
1865 *
1866 * Note that this algorithm is known to not be very efficient (O(N^2))
1867 * but given that it only works on list of tables given to us by user
1868 * it's deemed acceptable.
1869 */
1870 if (list_member_oid(relids, myrelid))
1871 {
1872 /* Disallow duplicate tables if there are any with row filters. */
1874 ereport(ERROR,
1876 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1878
1879 /* Disallow duplicate tables if there are any with column lists. */
1881 ereport(ERROR,
1883 errmsg("conflicting or redundant column lists for table \"%s\"",
1885
1887 continue;
1888 }
1889
1891 pub_rel->relation = rel;
1892 pub_rel->whereClause = t->whereClause;
1893 pub_rel->columns = t->columns;
1894 pub_rel->except = t->except;
1895 rels = lappend(rels, pub_rel);
1896 relids = lappend_oid(relids, myrelid);
1897
1898 if (t->whereClause)
1900
1901 if (t->columns)
1903
1904 /*
1905 * Add children of this rel, if requested, so that they too are added
1906 * to the publication. A partitioned table can't have any inheritance
1907 * children other than its partitions, which need not be explicitly
1908 * added to the publication.
1909 */
1910 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1911 {
1912 List *children;
1913 ListCell *child;
1914
1916 NULL);
1917
1918 foreach(child, children)
1919 {
1920 Oid childrelid = lfirst_oid(child);
1921
1922 /* Allow query cancel in case this takes a long time */
1924
1925 /*
1926 * Skip duplicates if user specified both parent and child
1927 * tables.
1928 */
1929 if (list_member_oid(relids, childrelid))
1930 {
1931 /*
1932 * We don't allow to specify row filter for both parent
1933 * and child table at the same time as it is not very
1934 * clear which one should be given preference.
1935 */
1936 if (childrelid != myrelid &&
1938 ereport(ERROR,
1940 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1942
1943 /*
1944 * We don't allow to specify column list for both parent
1945 * and child table at the same time as it is not very
1946 * clear which one should be given preference.
1947 */
1948 if (childrelid != myrelid &&
1950 ereport(ERROR,
1952 errmsg("conflicting or redundant column lists for table \"%s\"",
1954
1955 continue;
1956 }
1957
1958 /* find_all_inheritors already got lock */
1961 pub_rel->relation = rel;
1962 /* child inherits WHERE clause from parent */
1963 pub_rel->whereClause = t->whereClause;
1964
1965 /* child inherits column list from parent */
1966 pub_rel->columns = t->columns;
1967 pub_rel->except = t->except;
1968 rels = lappend(rels, pub_rel);
1969 relids = lappend_oid(relids, childrelid);
1970
1971 if (t->whereClause)
1973
1974 if (t->columns)
1976 }
1977 }
1978 }
1979
1980 list_free(relids);
1982
1983 return rels;
1984}
1985
1986/*
1987 * Close all relations in the list.
1988 */
1989static void
1991{
1992 ListCell *lc;
1993
1994 foreach(lc, rels)
1995 {
1997
1999 table_close(pub_rel->relation, NoLock);
2000 }
2001
2002 list_free_deep(rels);
2003}
2004
2005/*
2006 * Lock the schemas specified in the schema list in AccessShareLock mode in
2007 * order to prevent concurrent schema deletion.
2008 */
2009static void
2011{
2012 ListCell *lc;
2013
2014 foreach(lc, schemalist)
2015 {
2017
2018 /* Allow query cancel in case this takes a long time */
2021
2022 /*
2023 * It is possible that by the time we acquire the lock on schema,
2024 * concurrent DDL has removed it. We can test this by checking the
2025 * existence of schema.
2026 */
2028 ereport(ERROR,
2030 errmsg("schema with OID %u does not exist", schemaid));
2031 }
2032}
2033
2034/*
2035 * Add listed tables to the publication.
2036 */
2037static void
2038PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
2040{
2041 ListCell *lc;
2042
2043 foreach(lc, rels)
2044 {
2046 Relation rel = pub_rel->relation;
2047 ObjectAddress obj;
2048
2049 /* Must be owner of the table or superuser. */
2053
2054 obj = publication_add_relation(pubid, pub_rel, if_not_exists, stmt);
2055 if (stmt)
2056 {
2058 (Node *) stmt);
2059
2061 obj.objectId, 0);
2062 }
2063 }
2064}
2065
2066/*
2067 * Remove listed tables from the publication.
2068 */
2069static void
2070PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
2071{
2072 ObjectAddress obj;
2073 ListCell *lc;
2074 Oid prid;
2075
2076 foreach(lc, rels)
2077 {
2079 Relation rel = pubrel->relation;
2080 Oid relid = RelationGetRelid(rel);
2081
2082 if (pubrel->columns)
2083 ereport(ERROR,
2085 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
2086
2088 ObjectIdGetDatum(relid),
2089 ObjectIdGetDatum(pubid));
2090 if (!OidIsValid(prid))
2091 {
2092 if (missing_ok)
2093 continue;
2094
2095 ereport(ERROR,
2097 errmsg("relation \"%s\" is not part of the publication",
2099 }
2100
2101 if (pubrel->whereClause)
2102 ereport(ERROR,
2104 errmsg("cannot use a WHERE clause when removing a table from a publication")));
2105
2107 performDeletion(&obj, DROP_CASCADE, 0);
2108 }
2109}
2110
2111/*
2112 * Add listed schemas to the publication.
2113 */
2114static void
2115PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
2117{
2118 ListCell *lc;
2119
2120 foreach(lc, schemas)
2121 {
2123 ObjectAddress obj;
2124
2125 obj = publication_add_schema(pubid, schemaid, if_not_exists);
2126 if (stmt)
2127 {
2129 (Node *) stmt);
2130
2132 obj.objectId, 0);
2133 }
2134 }
2135}
2136
2137/*
2138 * Remove listed schemas from the publication.
2139 */
2140static void
2141PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2142{
2143 ObjectAddress obj;
2144 ListCell *lc;
2145 Oid psid;
2146
2147 foreach(lc, schemas)
2148 {
2150
2154 ObjectIdGetDatum(pubid));
2155 if (!OidIsValid(psid))
2156 {
2157 if (missing_ok)
2158 continue;
2159
2160 ereport(ERROR,
2162 errmsg("tables from schema \"%s\" are not part of the publication",
2164 }
2165
2167 performDeletion(&obj, DROP_CASCADE, 0);
2168 }
2169}
2170
2171/*
2172 * Internal workhorse for changing a publication owner
2173 */
2174static void
2176{
2178
2180
2181 if (form->pubowner == newOwnerId)
2182 return;
2183
2184 if (!superuser())
2185 {
2187
2188 /* Must be owner */
2191 NameStr(form->pubname));
2192
2193 /* Must be able to become new owner */
2195
2196 /* New owner must have CREATE privilege on database */
2198 if (aclresult != ACLCHECK_OK)
2201
2203 {
2204 if (form->puballtables || form->puballsequences ||
2206 ereport(ERROR,
2208 errmsg("permission denied to change owner of publication \"%s\"",
2209 NameStr(form->pubname)),
2210 errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2211 }
2212 }
2213
2214 form->pubowner = newOwnerId;
2215 CatalogTupleUpdate(rel, &tup->t_self, tup);
2216
2217 /* Update owner dependency reference */
2219 form->oid,
2220 newOwnerId);
2221
2223 form->oid, 0);
2224}
2225
2226/*
2227 * Change publication owner -- by name
2228 */
2231{
2232 Oid pubid;
2233 HeapTuple tup;
2234 Relation rel;
2235 ObjectAddress address;
2237
2239
2241
2242 if (!HeapTupleIsValid(tup))
2243 ereport(ERROR,
2245 errmsg("publication \"%s\" does not exist", name)));
2246
2248 pubid = pubform->oid;
2249
2251
2252 ObjectAddressSet(address, PublicationRelationId, pubid);
2253
2255
2257
2258 return address;
2259}
2260
2261/*
2262 * Change publication owner -- by OID
2263 */
2264void
2266{
2267 HeapTuple tup;
2268 Relation rel;
2269
2271
2273
2274 if (!HeapTupleIsValid(tup))
2275 ereport(ERROR,
2277 errmsg("publication with OID %u does not exist", pubid)));
2278
2280
2282
2284}
2285
2286/*
2287 * Extract the publish_generated_columns option value from a DefElem. "stored"
2288 * and "none" values are accepted.
2289 */
2290static char
2292{
2293 char *sval = "";
2294
2295 /*
2296 * A parameter value is required.
2297 */
2298 if (def->arg)
2299 {
2300 sval = defGetString(def);
2301
2302 if (pg_strcasecmp(sval, "none") == 0)
2303 return PUBLISH_GENCOLS_NONE;
2304 if (pg_strcasecmp(sval, "stored") == 0)
2306 }
2307
2308 ereport(ERROR,
2310 errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2311 errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2312
2313 return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2314}
void check_can_set_role(Oid member, Oid role)
Definition acl.c:5371
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
@ ACLCHECK_NOT_OWNER
Definition acl.h:186
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3879
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4133
int16 AttrNumber
Definition attnum.h:21
#define InvalidAttrNumber
Definition attnum.h:23
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition bitmapset.c:142
int bms_next_member(const Bitmapset *a, int prevbit)
Definition bitmapset.c:1290
void bms_free(Bitmapset *a)
Definition bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition bitmapset.c:510
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:837
#define Assert(condition)
Definition c.h:945
#define OidIsValid(objectId)
Definition c.h:860
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
char * defGetString(DefElem *def)
Definition define.c:34
bool defGetBoolean(DefElem *def)
Definition define.c:93
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition define.c:370
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
int errcode(int sqlerrcode)
Definition elog.c:874
#define _(x)
Definition elog.c:95
int int errdetail_internal(const char *fmt,...) pg_attribute_printf(1
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define WARNING
Definition elog.h:36
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define NOTICE
Definition elog.h:35
#define ereport(elevel,...)
Definition elog.h:150
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, const Node *parsetree)
#define palloc_object(type)
Definition fe_memutils.h:74
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:684
Oid MyDatabaseId
Definition globals.c:94
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1130
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1037
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition heaptuple.c:456
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1384
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void CacheInvalidateRelSyncAll(void)
Definition inval.c:1724
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition inval.c:1691
void CacheInvalidateRelSync(Oid relid)
Definition inval.c:1712
void CacheInvalidateRelcacheAll(void)
Definition inval.c:1658
int x
Definition isn.c:75
List * list_concat_unique_oid(List *list1, const List *list2)
Definition list.c:1469
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_difference_oid(const List *list1, const List *list2)
Definition list.c:1313
List * list_concat(List *list1, const List *list2)
Definition list.c:561
List * lappend_oid(List *list, Oid datum)
Definition list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition list.c:1380
void list_free(List *list)
Definition list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition list.c:722
void list_free_deep(List *list)
Definition list.c:1560
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1008
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition lockdefs.h:39
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_rel_name(Oid relid)
Definition lsyscache.c:2148
AttrNumber get_attnum(Oid relid, const char *attname)
Definition lsyscache.c:977
char * get_database_name(Oid dbid)
Definition lsyscache.c:1312
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2223
char func_volatile(Oid funcid)
Definition lsyscache.c:2000
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition lsyscache.c:946
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
char * pstrdup(const char *in)
Definition mcxt.c:1781
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
Oid GetUserId(void)
Definition miscinit.c:470
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
List * fetch_search_path(bool includeImplicit)
Definition namespace.c:4891
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Definition namespace.c:3607
Oid exprType(const Node *expr)
Definition nodeFuncs.c:42
Oid exprInputCollation(const Node *expr)
Definition nodeFuncs.c:1084
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Definition nodeFuncs.c:1917
Oid exprCollation(const Node *expr)
Definition nodeFuncs.c:826
int exprLocation(const Node *expr)
Definition nodeFuncs.c:1392
#define expression_tree_walker(n, w, c)
Definition nodeFuncs.h:153
#define IsA(nodeptr, _type_)
Definition nodes.h:164
#define copyObject(obj)
Definition nodes.h:232
#define nodeTag(nodeptr)
Definition nodes.h:139
static char * errmsg
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
Definition parse_node.c:72
int parser_errposition(ParseState *pstate, int location)
Definition parse_node.c:106
ParseState * make_parsestate(ParseState *parentParseState)
Definition parse_node.c:39
@ EXPR_KIND_WHERE
Definition parse_node.h:46
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, LOCKMODE lockmode, Alias *alias, bool inh, bool inFromCl)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
@ PUBLICATIONOBJ_TABLE
@ PUBLICATIONOBJ_EXCEPT_TABLE
@ AP_DropObjects
@ AP_SetObjects
@ AP_AddObjects
@ DROP_CASCADE
@ OBJECT_DATABASE
@ OBJECT_PUBLICATION
#define ACL_CREATE
Definition parsenodes.h:85
int16 attnum
FormData_pg_attribute * Form_pg_attribute
NameData relname
Definition pg_class.h:40
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst(lc)
Definition pg_list.h:172
#define lfirst_node(type, lc)
Definition pg_list.h:176
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_oid(var, lst)
Definition pg_list.h:471
#define linitial_oid(l)
Definition pg_list.h:180
#define lfirst_oid(lc)
Definition pg_list.h:174
Bitmapset * pub_collist_validate(Relation targetrel, List *columns)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
bool is_schema_publication(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
List * GetPublicationSchemas(Oid pubid)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
List * GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists, AlterPublicationStmt *alter_stmt)
Publication * GetPublication(Oid pubid)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
bool is_table_publication(Oid pubid)
List * GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
END_CATALOG_STRUCT typedef FormData_pg_publication * Form_pg_publication
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
END_CATALOG_STRUCT typedef FormData_pg_publication_namespace * Form_pg_publication_namespace
END_CATALOG_STRUCT typedef FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
int pg_strcasecmp(const char *s1, const char *s2)
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition postgres.h:370
static Datum CharGetDatum(char X)
Definition postgres.h:132
unsigned int Oid
static int fb(int x)
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
static void AlterPublicationAllFlags(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
void InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **exceptrels, List **schemas)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
static char defGetGeneratedColsOption(DefElem *def)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root, bool *publish_generated_columns_given, char *publish_generated_columns)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
Definition read.c:90
#define RelationGetRelid(relation)
Definition rel.h:514
#define RelationGetDescr(relation)
Definition rel.h:540
#define RelationGetRelationName(relation)
Definition rel.h:548
#define RelationGetNamespace(relation)
Definition rel.h:555
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition relcache.c:5293
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition relcache.h:71
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
char * defname
Definition parsenodes.h:857
Node * arg
Definition parsenodes.h:858
Definition pg_list.h:54
Definition nodes.h:135
const char * p_sourcetext
Definition parse_node.h:210
RangeVar * relation
bool inh
Definition primnodes.h:87
TupleDesc rd_att
Definition rel.h:112
Form_pg_class rd_rel
Definition rel.h:111
bool has_generated_virtual
Definition tupdesc.h:47
bool has_generated_stored
Definition tupdesc.h:46
TupleConstr * constr
Definition tupdesc.h:159
AttrNumber varattno
Definition primnodes.h:275
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
Definition superuser.c:57
bool superuser(void)
Definition superuser.c:47
#define FirstLowInvalidHeapAttributeNumber
Definition sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:230
HeapTuple SearchSysCache1(SysCacheIdentifier cacheId, Datum key1)
Definition syscache.c:220
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:595
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
#define SearchSysCacheExists1(cacheId, key1)
Definition syscache.h:100
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition syscache.h:109
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition table.c:83
#define FirstNormalObjectId
Definition transam.h:197
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition tupdesc.h:178
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition varlena.c:2777
const char * name
void CommandCounterIncrement(void)
Definition xact.c:1102
int wal_level
Definition xlog.c:135
@ WAL_LEVEL_REPLICA
Definition xlog.h:76