PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
publicationcmds.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * publicationcmds.c
4 * publication manipulation
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/publicationcmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/htup_details.h"
18#include "access/table.h"
19#include "access/xact.h"
20#include "catalog/catalog.h"
21#include "catalog/indexing.h"
22#include "catalog/namespace.h"
25#include "catalog/pg_database.h"
26#include "catalog/pg_inherits.h"
28#include "catalog/pg_proc.h"
32#include "commands/dbcommands.h"
33#include "commands/defrem.h"
36#include "miscadmin.h"
37#include "nodes/nodeFuncs.h"
38#include "parser/parse_clause.h"
42#include "storage/lmgr.h"
43#include "utils/acl.h"
44#include "utils/builtins.h"
45#include "utils/inval.h"
46#include "utils/lsyscache.h"
47#include "utils/rel.h"
48#include "utils/syscache.h"
49#include "utils/varlena.h"
50
51
52/*
53 * Information used to validate the columns in the row filter expression. See
54 * contain_invalid_rfcolumn_walker for details.
55 */
56typedef struct rf_context
57{
58 Bitmapset *bms_replident; /* bitset of replica identity columns */
59 bool pubviaroot; /* true if we are validating the parent
60 * relation's row filter */
61 Oid relid; /* relid of the relation */
62 Oid parentid; /* relid of the parent relation */
64
65static List *OpenTableList(List *tables);
66static void CloseTableList(List *rels);
67static void LockSchemaList(List *schemalist);
68static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
70static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
71static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
73static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
74static char defGetGeneratedColsOption(DefElem *def);
75
76
77static void
80 bool *publish_given,
81 PublicationActions *pubactions,
82 bool *publish_via_partition_root_given,
83 bool *publish_via_partition_root,
84 bool *publish_generated_columns_given,
85 char *publish_generated_columns)
86{
87 ListCell *lc;
88
89 *publish_given = false;
90 *publish_via_partition_root_given = false;
91 *publish_generated_columns_given = false;
92
93 /* defaults */
94 pubactions->pubinsert = true;
95 pubactions->pubupdate = true;
96 pubactions->pubdelete = true;
97 pubactions->pubtruncate = true;
98 *publish_via_partition_root = false;
99 *publish_generated_columns = PUBLISH_GENCOLS_NONE;
100
101 /* Parse options */
102 foreach(lc, options)
103 {
104 DefElem *defel = (DefElem *) lfirst(lc);
105
106 if (strcmp(defel->defname, "publish") == 0)
107 {
108 char *publish;
109 List *publish_list;
110 ListCell *lc2;
111
112 if (*publish_given)
113 errorConflictingDefElem(defel, pstate);
114
115 /*
116 * If publish option was given only the explicitly listed actions
117 * should be published.
118 */
119 pubactions->pubinsert = false;
120 pubactions->pubupdate = false;
121 pubactions->pubdelete = false;
122 pubactions->pubtruncate = false;
123
124 *publish_given = true;
125 publish = defGetString(defel);
126
127 if (!SplitIdentifierString(publish, ',', &publish_list))
129 (errcode(ERRCODE_SYNTAX_ERROR),
130 errmsg("invalid list syntax in parameter \"%s\"",
131 "publish")));
132
133 /* Process the option list. */
134 foreach(lc2, publish_list)
135 {
136 char *publish_opt = (char *) lfirst(lc2);
137
138 if (strcmp(publish_opt, "insert") == 0)
139 pubactions->pubinsert = true;
140 else if (strcmp(publish_opt, "update") == 0)
141 pubactions->pubupdate = true;
142 else if (strcmp(publish_opt, "delete") == 0)
143 pubactions->pubdelete = true;
144 else if (strcmp(publish_opt, "truncate") == 0)
145 pubactions->pubtruncate = true;
146 else
148 (errcode(ERRCODE_SYNTAX_ERROR),
149 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
150 "publish", publish_opt)));
151 }
152 }
153 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
154 {
155 if (*publish_via_partition_root_given)
156 errorConflictingDefElem(defel, pstate);
157 *publish_via_partition_root_given = true;
158 *publish_via_partition_root = defGetBoolean(defel);
159 }
160 else if (strcmp(defel->defname, "publish_generated_columns") == 0)
161 {
162 if (*publish_generated_columns_given)
163 errorConflictingDefElem(defel, pstate);
164 *publish_generated_columns_given = true;
165 *publish_generated_columns = defGetGeneratedColsOption(defel);
166 }
167 else
169 (errcode(ERRCODE_SYNTAX_ERROR),
170 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
171 }
172}
173
174/*
175 * Convert the PublicationObjSpecType list into schema oid list and
176 * PublicationTable list.
177 */
178static void
180 List **rels, List **schemas)
181{
182 ListCell *cell;
183 PublicationObjSpec *pubobj;
184
185 if (!pubobjspec_list)
186 return;
187
188 foreach(cell, pubobjspec_list)
189 {
190 Oid schemaid;
191 List *search_path;
192
193 pubobj = (PublicationObjSpec *) lfirst(cell);
194
195 switch (pubobj->pubobjtype)
196 {
198 *rels = lappend(*rels, pubobj->pubtable);
199 break;
201 schemaid = get_namespace_oid(pubobj->name, false);
202
203 /* Filter out duplicates if user specifies "sch1, sch1" */
204 *schemas = list_append_unique_oid(*schemas, schemaid);
205 break;
207 search_path = fetch_search_path(false);
208 if (search_path == NIL) /* nothing valid in search_path? */
210 errcode(ERRCODE_UNDEFINED_SCHEMA),
211 errmsg("no schema has been selected for CURRENT_SCHEMA"));
212
213 schemaid = linitial_oid(search_path);
214 list_free(search_path);
215
216 /* Filter out duplicates if user specifies "sch1, sch1" */
217 *schemas = list_append_unique_oid(*schemas, schemaid);
218 break;
219 default:
220 /* shouldn't happen */
221 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
222 break;
223 }
224 }
225}
226
227/*
228 * Returns true if any of the columns used in the row filter WHERE expression is
229 * not part of REPLICA IDENTITY, false otherwise.
230 */
231static bool
233{
234 if (node == NULL)
235 return false;
236
237 if (IsA(node, Var))
238 {
239 Var *var = (Var *) node;
241
242 /*
243 * If pubviaroot is true, we are validating the row filter of the
244 * parent table, but the bitmap contains the replica identity
245 * information of the child table. So, get the column number of the
246 * child table as parent and child column order could be different.
247 */
248 if (context->pubviaroot)
249 {
250 char *colname = get_attname(context->parentid, attnum, false);
251
252 attnum = get_attnum(context->relid, colname);
253 }
254
256 context->bms_replident))
257 return true;
258 }
259
261 context);
262}
263
264/*
265 * Check if all columns referenced in the filter expression are part of the
266 * REPLICA IDENTITY index or not.
267 *
268 * Returns true if any invalid column is found.
269 */
270bool
272 bool pubviaroot)
273{
274 HeapTuple rftuple;
275 Oid relid = RelationGetRelid(relation);
276 Oid publish_as_relid = RelationGetRelid(relation);
277 bool result = false;
278 Datum rfdatum;
279 bool rfisnull;
280
281 /*
282 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
283 * allowed in the row filter and we can skip the validation.
284 */
285 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
286 return false;
287
288 /*
289 * For a partition, if pubviaroot is true, find the topmost ancestor that
290 * is published via this publication as we need to use its row filter
291 * expression to filter the partition's changes.
292 *
293 * Note that even though the row filter used is for an ancestor, the
294 * REPLICA IDENTITY used will be for the actual child table.
295 */
296 if (pubviaroot && relation->rd_rel->relispartition)
297 {
298 publish_as_relid
299 = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
300
301 if (!OidIsValid(publish_as_relid))
302 publish_as_relid = relid;
303 }
304
305 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
306 ObjectIdGetDatum(publish_as_relid),
307 ObjectIdGetDatum(pubid));
308
309 if (!HeapTupleIsValid(rftuple))
310 return false;
311
312 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
313 Anum_pg_publication_rel_prqual,
314 &rfisnull);
315
316 if (!rfisnull)
317 {
318 rf_context context = {0};
319 Node *rfnode;
320 Bitmapset *bms = NULL;
321
322 context.pubviaroot = pubviaroot;
323 context.parentid = publish_as_relid;
324 context.relid = relid;
325
326 /* Remember columns that are part of the REPLICA IDENTITY */
327 bms = RelationGetIndexAttrBitmap(relation,
329
330 context.bms_replident = bms;
331 rfnode = stringToNode(TextDatumGetCString(rfdatum));
332 result = contain_invalid_rfcolumn_walker(rfnode, &context);
333 }
334
335 ReleaseSysCache(rftuple);
336
337 return result;
338}
339
340/*
341 * Check for invalid columns in the publication table definition.
342 *
343 * This function evaluates two conditions:
344 *
345 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
346 * by the column list. If any column is missing, *invalid_column_list is set
347 * to true.
348 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
349 * are published, either by being explicitly named in the column list or, if
350 * no column list is specified, by setting the option
351 * publish_generated_columns to stored. If any unpublished
352 * generated column is found, *invalid_gen_col is set to true.
353 *
354 * Returns true if any of the above conditions are not met.
355 */
356bool
357pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
358 bool pubviaroot, char pubgencols_type,
359 bool *invalid_column_list,
360 bool *invalid_gen_col)
361{
362 Oid relid = RelationGetRelid(relation);
363 Oid publish_as_relid = RelationGetRelid(relation);
364 Bitmapset *idattrs;
365 Bitmapset *columns = NULL;
366 TupleDesc desc = RelationGetDescr(relation);
367 Publication *pub;
368 int x;
369
370 *invalid_column_list = false;
371 *invalid_gen_col = false;
372
373 /*
374 * For a partition, if pubviaroot is true, find the topmost ancestor that
375 * is published via this publication as we need to use its column list for
376 * the changes.
377 *
378 * Note that even though the column list used is for an ancestor, the
379 * REPLICA IDENTITY used will be for the actual child table.
380 */
381 if (pubviaroot && relation->rd_rel->relispartition)
382 {
383 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
384
385 if (!OidIsValid(publish_as_relid))
386 publish_as_relid = relid;
387 }
388
389 /* Fetch the column list */
390 pub = GetPublication(pubid);
391 check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
392
393 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
394 {
395 /* With REPLICA IDENTITY FULL, no column list is allowed. */
396 *invalid_column_list = (columns != NULL);
397
398 /*
399 * As we don't allow a column list with REPLICA IDENTITY FULL, the
400 * publish_generated_columns option must be set to stored if the table
401 * has any stored generated columns.
402 */
403 if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
404 relation->rd_att->constr &&
406 *invalid_gen_col = true;
407
408 /*
409 * Virtual generated columns are currently not supported for logical
410 * replication at all.
411 */
412 if (relation->rd_att->constr &&
414 *invalid_gen_col = true;
415
416 if (*invalid_gen_col && *invalid_column_list)
417 return true;
418 }
419
420 /* Remember columns that are part of the REPLICA IDENTITY */
421 idattrs = RelationGetIndexAttrBitmap(relation,
423
424 /*
425 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
426 * (to handle system columns the usual way), while column list does not
427 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
428 * over the idattrs and check all of them are in the list.
429 */
430 x = -1;
431 while ((x = bms_next_member(idattrs, x)) >= 0)
432 {
434 Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
435
436 if (columns == NULL)
437 {
438 /*
439 * The publish_generated_columns option must be set to stored if
440 * the REPLICA IDENTITY contains any stored generated column.
441 */
442 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
443 {
444 *invalid_gen_col = true;
445 break;
446 }
447
448 /*
449 * The equivalent setting for virtual generated columns does not
450 * exist yet.
451 */
452 if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
453 {
454 *invalid_gen_col = true;
455 break;
456 }
457
458 /* Skip validating the column list since it is not defined */
459 continue;
460 }
461
462 /*
463 * If pubviaroot is true, we are validating the column list of the
464 * parent table, but the bitmap contains the replica identity
465 * information of the child table. The parent/child attnums may not
466 * match, so translate them to the parent - get the attname from the
467 * child, and look it up in the parent.
468 */
469 if (pubviaroot)
470 {
471 /* attribute name in the child table */
472 char *colname = get_attname(relid, attnum, false);
473
474 /*
475 * Determine the attnum for the attribute name in parent (we are
476 * using the column list defined on the parent).
477 */
478 attnum = get_attnum(publish_as_relid, colname);
479 }
480
481 /* replica identity column, not covered by the column list */
482 *invalid_column_list |= !bms_is_member(attnum, columns);
483
484 if (*invalid_column_list && *invalid_gen_col)
485 break;
486 }
487
488 bms_free(columns);
489 bms_free(idattrs);
490
491 return *invalid_column_list || *invalid_gen_col;
492}
493
494/*
495 * Invalidate entries in the RelationSyncCache for relations included in the
496 * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
497 *
498 * If 'puballtables' is true, invalidate all cache entries.
499 */
500void
501InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
502{
503 if (puballtables)
504 {
506 }
507 else
508 {
509 List *relids = NIL;
510 List *schemarelids = NIL;
511
512 /*
513 * For partitioned tables, we must invalidate all partitions and
514 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
515 * a target. However, WAL records for TRUNCATE specify both a root and
516 * its leaves.
517 */
518 relids = GetPublicationRelations(pubid,
520 schemarelids = GetAllSchemaPublicationRelations(pubid,
522
523 relids = list_concat_unique_oid(relids, schemarelids);
524
525 /* Invalidate the relsyncache */
526 foreach_oid(relid, relids)
528 }
529
530 return;
531}
532
533/* check_functions_in_node callback */
534static bool
536{
537 return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
538 func_id >= FirstNormalObjectId);
539}
540
541/*
542 * The row filter walker checks if the row filter expression is a "simple
543 * expression".
544 *
545 * It allows only simple or compound expressions such as:
546 * - (Var Op Const)
547 * - (Var Op Var)
548 * - (Var Op Const) AND/OR (Var Op Const)
549 * - etc
550 * (where Var is a column of the table this filter belongs to)
551 *
552 * The simple expression has the following restrictions:
553 * - User-defined operators are not allowed;
554 * - User-defined functions are not allowed;
555 * - User-defined types are not allowed;
556 * - User-defined collations are not allowed;
557 * - Non-immutable built-in functions are not allowed;
558 * - System columns are not allowed.
559 *
560 * NOTES
561 *
562 * We don't allow user-defined functions/operators/types/collations because
563 * (a) if a user drops a user-defined object used in a row filter expression or
564 * if there is any other error while using it, the logical decoding
565 * infrastructure won't be able to recover from such an error even if the
566 * object is recreated again because a historic snapshot is used to evaluate
567 * the row filter;
568 * (b) a user-defined function can be used to access tables that could have
569 * unpleasant results because a historic snapshot is used. That's why only
570 * immutable built-in functions are allowed in row filter expressions.
571 *
572 * We don't allow system columns because currently, we don't have that
573 * information in the tuple passed to downstream. Also, as we don't replicate
574 * those to subscribers, there doesn't seem to be a need for a filter on those
575 * columns.
576 *
577 * We can allow other node types after more analysis and testing.
578 */
579static bool
581{
582 char *errdetail_msg = NULL;
583
584 if (node == NULL)
585 return false;
586
587 switch (nodeTag(node))
588 {
589 case T_Var:
590 /* System columns are not allowed. */
591 if (((Var *) node)->varattno < InvalidAttrNumber)
592 errdetail_msg = _("System columns are not allowed.");
593 break;
594 case T_OpExpr:
595 case T_DistinctExpr:
596 case T_NullIfExpr:
597 /* OK, except user-defined operators are not allowed. */
598 if (((OpExpr *) node)->opno >= FirstNormalObjectId)
599 errdetail_msg = _("User-defined operators are not allowed.");
600 break;
601 case T_ScalarArrayOpExpr:
602 /* OK, except user-defined operators are not allowed. */
603 if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
604 errdetail_msg = _("User-defined operators are not allowed.");
605
606 /*
607 * We don't need to check the hashfuncid and negfuncid of
608 * ScalarArrayOpExpr as those functions are only built for a
609 * subquery.
610 */
611 break;
612 case T_RowCompareExpr:
613 {
614 ListCell *opid;
615
616 /* OK, except user-defined operators are not allowed. */
617 foreach(opid, ((RowCompareExpr *) node)->opnos)
618 {
619 if (lfirst_oid(opid) >= FirstNormalObjectId)
620 {
621 errdetail_msg = _("User-defined operators are not allowed.");
622 break;
623 }
624 }
625 }
626 break;
627 case T_Const:
628 case T_FuncExpr:
629 case T_BoolExpr:
630 case T_RelabelType:
631 case T_CollateExpr:
632 case T_CaseExpr:
633 case T_CaseTestExpr:
634 case T_ArrayExpr:
635 case T_RowExpr:
636 case T_CoalesceExpr:
637 case T_MinMaxExpr:
638 case T_XmlExpr:
639 case T_NullTest:
640 case T_BooleanTest:
641 case T_List:
642 /* OK, supported */
643 break;
644 default:
645 errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
646 break;
647 }
648
649 /*
650 * For all the supported nodes, if we haven't already found a problem,
651 * check the types, functions, and collations used in it. We check List
652 * by walking through each element.
653 */
654 if (!errdetail_msg && !IsA(node, List))
655 {
656 if (exprType(node) >= FirstNormalObjectId)
657 errdetail_msg = _("User-defined types are not allowed.");
659 pstate))
660 errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
661 else if (exprCollation(node) >= FirstNormalObjectId ||
663 errdetail_msg = _("User-defined collations are not allowed.");
664 }
665
666 /*
667 * If we found a problem in this node, throw error now. Otherwise keep
668 * going.
669 */
670 if (errdetail_msg)
672 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
673 errmsg("invalid publication WHERE expression"),
674 errdetail_internal("%s", errdetail_msg),
675 parser_errposition(pstate, exprLocation(node))));
676
678 pstate);
679}
680
681/*
682 * Check if the row filter expression is a "simple expression".
683 *
684 * See check_simple_rowfilter_expr_walker for details.
685 */
686static bool
688{
689 return check_simple_rowfilter_expr_walker(node, pstate);
690}
691
692/*
693 * Transform the publication WHERE expression for all the relations in the list,
694 * ensuring it is coerced to boolean and necessary collation information is
695 * added if required, and add a new nsitem/RTE for the associated relation to
696 * the ParseState's namespace list.
697 *
698 * Also check the publication row filter expression and throw an error if
699 * anything not permitted or unexpected is encountered.
700 */
701static void
702TransformPubWhereClauses(List *tables, const char *queryString,
703 bool pubviaroot)
704{
705 ListCell *lc;
706
707 foreach(lc, tables)
708 {
709 ParseNamespaceItem *nsitem;
710 Node *whereclause = NULL;
711 ParseState *pstate;
713
714 if (pri->whereClause == NULL)
715 continue;
716
717 /*
718 * If the publication doesn't publish changes via the root partitioned
719 * table, the partition's row filter will be used. So disallow using
720 * WHERE clause on partitioned table in this case.
721 */
722 if (!pubviaroot &&
723 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
725 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
726 errmsg("cannot use publication WHERE clause for relation \"%s\"",
728 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
729 "publish_via_partition_root")));
730
731 /*
732 * A fresh pstate is required so that we only have "this" table in its
733 * rangetable
734 */
735 pstate = make_parsestate(NULL);
736 pstate->p_sourcetext = queryString;
737 nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
738 AccessShareLock, NULL,
739 false, false);
740 addNSItemToQuery(pstate, nsitem, false, true, true);
741
742 whereclause = transformWhereClause(pstate,
745 "PUBLICATION WHERE");
746
747 /* Fix up collation information */
748 assign_expr_collations(pstate, whereclause);
749
750 whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
751
752 /*
753 * We allow only simple expressions in row filters. See
754 * check_simple_rowfilter_expr_walker.
755 */
756 check_simple_rowfilter_expr(whereclause, pstate);
757
758 free_parsestate(pstate);
759
760 pri->whereClause = whereclause;
761 }
762}
763
764
765/*
766 * Given a list of tables that are going to be added to a publication,
767 * verify that they fulfill the necessary preconditions, namely: no tables
768 * have a column list if any schema is published; and partitioned tables do
769 * not have column lists if publish_via_partition_root is not set.
770 *
771 * 'publish_schema' indicates that the publication contains any TABLES IN
772 * SCHEMA elements (newly added in this command, or preexisting).
773 * 'pubviaroot' is the value of publish_via_partition_root.
774 */
775static void
776CheckPubRelationColumnList(char *pubname, List *tables,
777 bool publish_schema, bool pubviaroot)
778{
779 ListCell *lc;
780
781 foreach(lc, tables)
782 {
784
785 if (pri->columns == NIL)
786 continue;
787
788 /*
789 * Disallow specifying column list if any schema is in the
790 * publication.
791 *
792 * XXX We could instead just forbid the case when the publication
793 * tries to publish the table with a column list and a schema for that
794 * table. However, if we do that then we need a restriction during
795 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
796 * seem to be a good idea.
797 */
798 if (publish_schema)
800 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
801 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
803 RelationGetRelationName(pri->relation), pubname),
804 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
805
806 /*
807 * If the publication doesn't publish changes via the root partitioned
808 * table, the partition's column list will be used. So disallow using
809 * a column list on the partitioned table in this case.
810 */
811 if (!pubviaroot &&
812 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
814 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
815 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
817 RelationGetRelationName(pri->relation), pubname),
818 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
819 "publish_via_partition_root")));
820 }
821}
822
823/*
824 * Create new publication.
825 */
828{
829 Relation rel;
830 ObjectAddress myself;
831 Oid puboid;
832 bool nulls[Natts_pg_publication];
833 Datum values[Natts_pg_publication];
834 HeapTuple tup;
835 bool publish_given;
836 PublicationActions pubactions;
837 bool publish_via_partition_root_given;
838 bool publish_via_partition_root;
839 bool publish_generated_columns_given;
840 char publish_generated_columns;
841 AclResult aclresult;
842 List *relations = NIL;
843 List *schemaidlist = NIL;
844
845 /* must have CREATE privilege on database */
846 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
847 if (aclresult != ACLCHECK_OK)
850
851 /* FOR ALL TABLES requires superuser */
852 if (stmt->for_all_tables && !superuser())
854 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
855 errmsg("must be superuser to create FOR ALL TABLES publication")));
856
857 rel = table_open(PublicationRelationId, RowExclusiveLock);
858
859 /* Check if name is used */
860 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
861 CStringGetDatum(stmt->pubname));
862 if (OidIsValid(puboid))
865 errmsg("publication \"%s\" already exists",
866 stmt->pubname)));
867
868 /* Form a tuple. */
869 memset(values, 0, sizeof(values));
870 memset(nulls, false, sizeof(nulls));
871
872 values[Anum_pg_publication_pubname - 1] =
874 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
875
877 stmt->options,
878 &publish_given, &pubactions,
879 &publish_via_partition_root_given,
880 &publish_via_partition_root,
881 &publish_generated_columns_given,
882 &publish_generated_columns);
883
884 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
885 Anum_pg_publication_oid);
886 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
887 values[Anum_pg_publication_puballtables - 1] =
888 BoolGetDatum(stmt->for_all_tables);
889 values[Anum_pg_publication_pubinsert - 1] =
890 BoolGetDatum(pubactions.pubinsert);
891 values[Anum_pg_publication_pubupdate - 1] =
892 BoolGetDatum(pubactions.pubupdate);
893 values[Anum_pg_publication_pubdelete - 1] =
894 BoolGetDatum(pubactions.pubdelete);
895 values[Anum_pg_publication_pubtruncate - 1] =
896 BoolGetDatum(pubactions.pubtruncate);
897 values[Anum_pg_publication_pubviaroot - 1] =
898 BoolGetDatum(publish_via_partition_root);
899 values[Anum_pg_publication_pubgencols - 1] =
900 CharGetDatum(publish_generated_columns);
901
902 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
903
904 /* Insert tuple into catalog. */
905 CatalogTupleInsert(rel, tup);
906 heap_freetuple(tup);
907
908 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
909
910 ObjectAddressSet(myself, PublicationRelationId, puboid);
911
912 /* Make the changes visible. */
914
915 /* Associate objects with the publication. */
916 if (stmt->for_all_tables)
917 {
918 /* Invalidate relcache so that publication info is rebuilt. */
920 }
921 else
922 {
923 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
924 &schemaidlist);
925
926 /* FOR TABLES IN SCHEMA requires superuser */
927 if (schemaidlist != NIL && !superuser())
929 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
930 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
931
932 if (relations != NIL)
933 {
934 List *rels;
935
936 rels = OpenTableList(relations);
938 publish_via_partition_root);
939
940 CheckPubRelationColumnList(stmt->pubname, rels,
941 schemaidlist != NIL,
942 publish_via_partition_root);
943
944 PublicationAddTables(puboid, rels, true, NULL);
945 CloseTableList(rels);
946 }
947
948 if (schemaidlist != NIL)
949 {
950 /*
951 * Schema lock is held until the publication is created to prevent
952 * concurrent schema deletion.
953 */
954 LockSchemaList(schemaidlist);
955 PublicationAddSchemas(puboid, schemaidlist, true, NULL);
956 }
957 }
958
960
961 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
962
965 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
966 errmsg("\"wal_level\" is insufficient to publish logical changes"),
967 errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
968
969 return myself;
970}
971
972/*
973 * Change options of a publication.
974 */
975static void
977 Relation rel, HeapTuple tup)
978{
979 bool nulls[Natts_pg_publication];
980 bool replaces[Natts_pg_publication];
981 Datum values[Natts_pg_publication];
982 bool publish_given;
983 PublicationActions pubactions;
984 bool publish_via_partition_root_given;
985 bool publish_via_partition_root;
986 bool publish_generated_columns_given;
987 char publish_generated_columns;
988 ObjectAddress obj;
989 Form_pg_publication pubform;
990 List *root_relids = NIL;
991 ListCell *lc;
992
994 stmt->options,
995 &publish_given, &pubactions,
996 &publish_via_partition_root_given,
997 &publish_via_partition_root,
998 &publish_generated_columns_given,
999 &publish_generated_columns);
1000
1001 pubform = (Form_pg_publication) GETSTRUCT(tup);
1002
1003 /*
1004 * If the publication doesn't publish changes via the root partitioned
1005 * table, the partition's row filter and column list will be used. So
1006 * disallow using WHERE clause and column lists on partitioned table in
1007 * this case.
1008 */
1009 if (!pubform->puballtables && publish_via_partition_root_given &&
1010 !publish_via_partition_root)
1011 {
1012 /*
1013 * Lock the publication so nobody else can do anything with it. This
1014 * prevents concurrent alter to add partitioned table(s) with WHERE
1015 * clause(s) and/or column lists which we don't allow when not
1016 * publishing via root.
1017 */
1018 LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1020
1021 root_relids = GetPublicationRelations(pubform->oid,
1023
1024 foreach(lc, root_relids)
1025 {
1026 Oid relid = lfirst_oid(lc);
1027 HeapTuple rftuple;
1028 char relkind;
1029 char *relname;
1030 bool has_rowfilter;
1031 bool has_collist;
1032
1033 /*
1034 * Beware: we don't have lock on the relations, so cope silently
1035 * with the cache lookups returning NULL.
1036 */
1037
1038 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1039 ObjectIdGetDatum(relid),
1040 ObjectIdGetDatum(pubform->oid));
1041 if (!HeapTupleIsValid(rftuple))
1042 continue;
1043 has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1044 has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1045 if (!has_rowfilter && !has_collist)
1046 {
1047 ReleaseSysCache(rftuple);
1048 continue;
1049 }
1050
1051 relkind = get_rel_relkind(relid);
1052 if (relkind != RELKIND_PARTITIONED_TABLE)
1053 {
1054 ReleaseSysCache(rftuple);
1055 continue;
1056 }
1057 relname = get_rel_name(relid);
1058 if (relname == NULL) /* table concurrently dropped */
1059 {
1060 ReleaseSysCache(rftuple);
1061 continue;
1062 }
1063
1064 if (has_rowfilter)
1065 ereport(ERROR,
1066 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1067 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1068 "publish_via_partition_root",
1069 stmt->pubname),
1070 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1071 relname, "publish_via_partition_root")));
1072 Assert(has_collist);
1073 ereport(ERROR,
1074 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1075 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1076 "publish_via_partition_root",
1077 stmt->pubname),
1078 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1079 relname, "publish_via_partition_root")));
1080 }
1081 }
1082
1083 /* Everything ok, form a new tuple. */
1084 memset(values, 0, sizeof(values));
1085 memset(nulls, false, sizeof(nulls));
1086 memset(replaces, false, sizeof(replaces));
1087
1088 if (publish_given)
1089 {
1090 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1091 replaces[Anum_pg_publication_pubinsert - 1] = true;
1092
1093 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1094 replaces[Anum_pg_publication_pubupdate - 1] = true;
1095
1096 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1097 replaces[Anum_pg_publication_pubdelete - 1] = true;
1098
1099 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1100 replaces[Anum_pg_publication_pubtruncate - 1] = true;
1101 }
1102
1103 if (publish_via_partition_root_given)
1104 {
1105 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1106 replaces[Anum_pg_publication_pubviaroot - 1] = true;
1107 }
1108
1109 if (publish_generated_columns_given)
1110 {
1111 values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1112 replaces[Anum_pg_publication_pubgencols - 1] = true;
1113 }
1114
1115 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1116 replaces);
1117
1118 /* Update the catalog. */
1119 CatalogTupleUpdate(rel, &tup->t_self, tup);
1120
1122
1123 pubform = (Form_pg_publication) GETSTRUCT(tup);
1124
1125 /* Invalidate the relcache. */
1126 if (pubform->puballtables)
1127 {
1129 }
1130 else
1131 {
1132 List *relids = NIL;
1133 List *schemarelids = NIL;
1134
1135 /*
1136 * For any partitioned tables contained in the publication, we must
1137 * invalidate all partitions contained in the respective partition
1138 * trees, not just those explicitly mentioned in the publication.
1139 */
1140 if (root_relids == NIL)
1141 relids = GetPublicationRelations(pubform->oid,
1143 else
1144 {
1145 /*
1146 * We already got tables explicitly mentioned in the publication.
1147 * Now get all partitions for the partitioned table in the list.
1148 */
1149 foreach(lc, root_relids)
1150 relids = GetPubPartitionOptionRelations(relids,
1152 lfirst_oid(lc));
1153 }
1154
1155 schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1157 relids = list_concat_unique_oid(relids, schemarelids);
1158
1160 }
1161
1162 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1164 (Node *) stmt);
1165
1166 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1167}
1168
1169/*
1170 * Invalidate the relations.
1171 */
1172void
1174{
1175 /*
1176 * We don't want to send too many individual messages, at some point it's
1177 * cheaper to just reset whole relcache.
1178 */
1180 {
1181 ListCell *lc;
1182
1183 foreach(lc, relids)
1185 }
1186 else
1188}
1189
1190/*
1191 * Add or remove table to/from publication.
1192 */
1193static void
1195 List *tables, const char *queryString,
1196 bool publish_schema)
1197{
1198 List *rels = NIL;
1200 Oid pubid = pubform->oid;
1201
1202 /*
1203 * Nothing to do if no objects, except in SET: for that it is quite
1204 * possible that user has not specified any tables in which case we need
1205 * to remove all the existing tables.
1206 */
1207 if (!tables && stmt->action != AP_SetObjects)
1208 return;
1209
1210 rels = OpenTableList(tables);
1211
1212 if (stmt->action == AP_AddObjects)
1213 {
1214 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1215
1216 publish_schema |= is_schema_publication(pubid);
1217
1218 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1219 pubform->pubviaroot);
1220
1221 PublicationAddTables(pubid, rels, false, stmt);
1222 }
1223 else if (stmt->action == AP_DropObjects)
1224 PublicationDropTables(pubid, rels, false);
1225 else /* AP_SetObjects */
1226 {
1227 List *oldrelids = GetPublicationRelations(pubid,
1229 List *delrels = NIL;
1230 ListCell *oldlc;
1231
1232 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1233
1234 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1235 pubform->pubviaroot);
1236
1237 /*
1238 * To recreate the relation list for the publication, look for
1239 * existing relations that do not need to be dropped.
1240 */
1241 foreach(oldlc, oldrelids)
1242 {
1243 Oid oldrelid = lfirst_oid(oldlc);
1244 ListCell *newlc;
1245 PublicationRelInfo *oldrel;
1246 bool found = false;
1247 HeapTuple rftuple;
1248 Node *oldrelwhereclause = NULL;
1249 Bitmapset *oldcolumns = NULL;
1250
1251 /* look up the cache for the old relmap */
1252 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1253 ObjectIdGetDatum(oldrelid),
1254 ObjectIdGetDatum(pubid));
1255
1256 /*
1257 * See if the existing relation currently has a WHERE clause or a
1258 * column list. We need to compare those too.
1259 */
1260 if (HeapTupleIsValid(rftuple))
1261 {
1262 bool isnull = true;
1263 Datum whereClauseDatum;
1264 Datum columnListDatum;
1265
1266 /* Load the WHERE clause for this table. */
1267 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1268 Anum_pg_publication_rel_prqual,
1269 &isnull);
1270 if (!isnull)
1271 oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1272
1273 /* Transform the int2vector column list to a bitmap. */
1274 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1275 Anum_pg_publication_rel_prattrs,
1276 &isnull);
1277
1278 if (!isnull)
1279 oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1280
1281 ReleaseSysCache(rftuple);
1282 }
1283
1284 foreach(newlc, rels)
1285 {
1286 PublicationRelInfo *newpubrel;
1287 Oid newrelid;
1288 Bitmapset *newcolumns = NULL;
1289
1290 newpubrel = (PublicationRelInfo *) lfirst(newlc);
1291 newrelid = RelationGetRelid(newpubrel->relation);
1292
1293 /*
1294 * Validate the column list. If the column list or WHERE
1295 * clause changes, then the validation done here will be
1296 * duplicated inside PublicationAddTables(). The validation
1297 * is cheap enough that that seems harmless.
1298 */
1299 newcolumns = pub_collist_validate(newpubrel->relation,
1300 newpubrel->columns);
1301
1302 /*
1303 * Check if any of the new set of relations matches with the
1304 * existing relations in the publication. Additionally, if the
1305 * relation has an associated WHERE clause, check the WHERE
1306 * expressions also match. Same for the column list. Drop the
1307 * rest.
1308 */
1309 if (newrelid == oldrelid)
1310 {
1311 if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1312 bms_equal(oldcolumns, newcolumns))
1313 {
1314 found = true;
1315 break;
1316 }
1317 }
1318 }
1319
1320 /*
1321 * Add the non-matched relations to a list so that they can be
1322 * dropped.
1323 */
1324 if (!found)
1325 {
1326 oldrel = palloc(sizeof(PublicationRelInfo));
1327 oldrel->whereClause = NULL;
1328 oldrel->columns = NIL;
1329 oldrel->relation = table_open(oldrelid,
1331 delrels = lappend(delrels, oldrel);
1332 }
1333 }
1334
1335 /* And drop them. */
1336 PublicationDropTables(pubid, delrels, true);
1337
1338 /*
1339 * Don't bother calculating the difference for adding, we'll catch and
1340 * skip existing ones when doing catalog update.
1341 */
1342 PublicationAddTables(pubid, rels, true, stmt);
1343
1344 CloseTableList(delrels);
1345 }
1346
1347 CloseTableList(rels);
1348}
1349
1350/*
1351 * Alter the publication schemas.
1352 *
1353 * Add or remove schemas to/from publication.
1354 */
1355static void
1357 HeapTuple tup, List *schemaidlist)
1358{
1360
1361 /*
1362 * Nothing to do if no objects, except in SET: for that it is quite
1363 * possible that user has not specified any schemas in which case we need
1364 * to remove all the existing schemas.
1365 */
1366 if (!schemaidlist && stmt->action != AP_SetObjects)
1367 return;
1368
1369 /*
1370 * Schema lock is held until the publication is altered to prevent
1371 * concurrent schema deletion.
1372 */
1373 LockSchemaList(schemaidlist);
1374 if (stmt->action == AP_AddObjects)
1375 {
1376 ListCell *lc;
1377 List *reloids;
1378
1379 reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1380
1381 foreach(lc, reloids)
1382 {
1383 HeapTuple coltuple;
1384
1385 coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1387 ObjectIdGetDatum(pubform->oid));
1388
1389 if (!HeapTupleIsValid(coltuple))
1390 continue;
1391
1392 /*
1393 * Disallow adding schema if column list is already part of the
1394 * publication. See CheckPubRelationColumnList.
1395 */
1396 if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1397 ereport(ERROR,
1398 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1399 errmsg("cannot add schema to publication \"%s\"",
1400 stmt->pubname),
1401 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1402
1403 ReleaseSysCache(coltuple);
1404 }
1405
1406 PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1407 }
1408 else if (stmt->action == AP_DropObjects)
1409 PublicationDropSchemas(pubform->oid, schemaidlist, false);
1410 else /* AP_SetObjects */
1411 {
1412 List *oldschemaids = GetPublicationSchemas(pubform->oid);
1413 List *delschemas = NIL;
1414
1415 /* Identify which schemas should be dropped */
1416 delschemas = list_difference_oid(oldschemaids, schemaidlist);
1417
1418 /*
1419 * Schema lock is held until the publication is altered to prevent
1420 * concurrent schema deletion.
1421 */
1422 LockSchemaList(delschemas);
1423
1424 /* And drop them */
1425 PublicationDropSchemas(pubform->oid, delschemas, true);
1426
1427 /*
1428 * Don't bother calculating the difference for adding, we'll catch and
1429 * skip existing ones when doing catalog update.
1430 */
1431 PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1432 }
1433}
1434
1435/*
1436 * Check if relations and schemas can be in a given publication and throw
1437 * appropriate error if not.
1438 */
1439static void
1441 List *tables, List *schemaidlist)
1442{
1444
1445 if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1446 schemaidlist && !superuser())
1447 ereport(ERROR,
1448 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1449 errmsg("must be superuser to add or set schemas")));
1450
1451 /*
1452 * Check that user is allowed to manipulate the publication tables in
1453 * schema
1454 */
1455 if (schemaidlist && pubform->puballtables)
1456 ereport(ERROR,
1457 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1458 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1459 NameStr(pubform->pubname)),
1460 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1461
1462 /* Check that user is allowed to manipulate the publication tables. */
1463 if (tables && pubform->puballtables)
1464 ereport(ERROR,
1465 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1466 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1467 NameStr(pubform->pubname)),
1468 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1469}
1470
1471/*
1472 * Alter the existing publication.
1473 *
1474 * This is dispatcher function for AlterPublicationOptions,
1475 * AlterPublicationSchemas and AlterPublicationTables.
1476 */
1477void
1479{
1480 Relation rel;
1481 HeapTuple tup;
1482 Form_pg_publication pubform;
1483
1484 rel = table_open(PublicationRelationId, RowExclusiveLock);
1485
1486 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1487 CStringGetDatum(stmt->pubname));
1488
1489 if (!HeapTupleIsValid(tup))
1490 ereport(ERROR,
1491 (errcode(ERRCODE_UNDEFINED_OBJECT),
1492 errmsg("publication \"%s\" does not exist",
1493 stmt->pubname)));
1494
1495 pubform = (Form_pg_publication) GETSTRUCT(tup);
1496
1497 /* must be owner */
1498 if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1500 stmt->pubname);
1501
1502 if (stmt->options)
1503 AlterPublicationOptions(pstate, stmt, rel, tup);
1504 else
1505 {
1506 List *relations = NIL;
1507 List *schemaidlist = NIL;
1508 Oid pubid = pubform->oid;
1509
1510 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1511 &schemaidlist);
1512
1513 CheckAlterPublication(stmt, tup, relations, schemaidlist);
1514
1515 heap_freetuple(tup);
1516
1517 /* Lock the publication so nobody else can do anything with it. */
1518 LockDatabaseObject(PublicationRelationId, pubid, 0,
1520
1521 /*
1522 * It is possible that by the time we acquire the lock on publication,
1523 * concurrent DDL has removed it. We can test this by checking the
1524 * existence of publication. We get the tuple again to avoid the risk
1525 * of any publication option getting changed.
1526 */
1527 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1528 if (!HeapTupleIsValid(tup))
1529 ereport(ERROR,
1530 errcode(ERRCODE_UNDEFINED_OBJECT),
1531 errmsg("publication \"%s\" does not exist",
1532 stmt->pubname));
1533
1534 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1535 schemaidlist != NIL);
1536 AlterPublicationSchemas(stmt, tup, schemaidlist);
1537 }
1538
1539 /* Cleanup. */
1540 heap_freetuple(tup);
1542}
1543
1544/*
1545 * Remove relation from publication by mapping OID.
1546 */
1547void
1549{
1550 Relation rel;
1551 HeapTuple tup;
1553 List *relids = NIL;
1554
1555 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1556
1557 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1558
1559 if (!HeapTupleIsValid(tup))
1560 elog(ERROR, "cache lookup failed for publication table %u",
1561 proid);
1562
1563 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1564
1565 /*
1566 * Invalidate relcache so that publication info is rebuilt.
1567 *
1568 * For the partitioned tables, we must invalidate all partitions contained
1569 * in the respective partition hierarchies, not just the one explicitly
1570 * mentioned in the publication. This is required because we implicitly
1571 * publish the child tables when the parent table is published.
1572 */
1574 pubrel->prrelid);
1575
1577
1578 CatalogTupleDelete(rel, &tup->t_self);
1579
1580 ReleaseSysCache(tup);
1581
1583}
1584
1585/*
1586 * Remove the publication by mapping OID.
1587 */
1588void
1590{
1591 Relation rel;
1592 HeapTuple tup;
1593 Form_pg_publication pubform;
1594
1595 rel = table_open(PublicationRelationId, RowExclusiveLock);
1596
1597 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1598 if (!HeapTupleIsValid(tup))
1599 elog(ERROR, "cache lookup failed for publication %u", pubid);
1600
1601 pubform = (Form_pg_publication) GETSTRUCT(tup);
1602
1603 /* Invalidate relcache so that publication info is rebuilt. */
1604 if (pubform->puballtables)
1606
1607 CatalogTupleDelete(rel, &tup->t_self);
1608
1609 ReleaseSysCache(tup);
1610
1612}
1613
1614/*
1615 * Remove schema from publication by mapping OID.
1616 */
1617void
1619{
1620 Relation rel;
1621 HeapTuple tup;
1622 List *schemaRels = NIL;
1624
1625 rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1626
1627 tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1628
1629 if (!HeapTupleIsValid(tup))
1630 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1631
1633
1634 /*
1635 * Invalidate relcache so that publication info is rebuilt. See
1636 * RemovePublicationRelById for why we need to consider all the
1637 * partitions.
1638 */
1639 schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1641 InvalidatePublicationRels(schemaRels);
1642
1643 CatalogTupleDelete(rel, &tup->t_self);
1644
1645 ReleaseSysCache(tup);
1646
1648}
1649
1650/*
1651 * Open relations specified by a PublicationTable list.
1652 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1653 * add them to a publication.
1654 */
1655static List *
1657{
1658 List *relids = NIL;
1659 List *rels = NIL;
1660 ListCell *lc;
1661 List *relids_with_rf = NIL;
1662 List *relids_with_collist = NIL;
1663
1664 /*
1665 * Open, share-lock, and check all the explicitly-specified relations
1666 */
1667 foreach(lc, tables)
1668 {
1670 bool recurse = t->relation->inh;
1671 Relation rel;
1672 Oid myrelid;
1673 PublicationRelInfo *pub_rel;
1674
1675 /* Allow query cancel in case this takes a long time */
1677
1679 myrelid = RelationGetRelid(rel);
1680
1681 /*
1682 * Filter out duplicates if user specifies "foo, foo".
1683 *
1684 * Note that this algorithm is known to not be very efficient (O(N^2))
1685 * but given that it only works on list of tables given to us by user
1686 * it's deemed acceptable.
1687 */
1688 if (list_member_oid(relids, myrelid))
1689 {
1690 /* Disallow duplicate tables if there are any with row filters. */
1691 if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1692 ereport(ERROR,
1694 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1696
1697 /* Disallow duplicate tables if there are any with column lists. */
1698 if (t->columns || list_member_oid(relids_with_collist, myrelid))
1699 ereport(ERROR,
1701 errmsg("conflicting or redundant column lists for table \"%s\"",
1703
1705 continue;
1706 }
1707
1708 pub_rel = palloc(sizeof(PublicationRelInfo));
1709 pub_rel->relation = rel;
1710 pub_rel->whereClause = t->whereClause;
1711 pub_rel->columns = t->columns;
1712 rels = lappend(rels, pub_rel);
1713 relids = lappend_oid(relids, myrelid);
1714
1715 if (t->whereClause)
1716 relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1717
1718 if (t->columns)
1719 relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1720
1721 /*
1722 * Add children of this rel, if requested, so that they too are added
1723 * to the publication. A partitioned table can't have any inheritance
1724 * children other than its partitions, which need not be explicitly
1725 * added to the publication.
1726 */
1727 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1728 {
1729 List *children;
1730 ListCell *child;
1731
1733 NULL);
1734
1735 foreach(child, children)
1736 {
1737 Oid childrelid = lfirst_oid(child);
1738
1739 /* Allow query cancel in case this takes a long time */
1741
1742 /*
1743 * Skip duplicates if user specified both parent and child
1744 * tables.
1745 */
1746 if (list_member_oid(relids, childrelid))
1747 {
1748 /*
1749 * We don't allow to specify row filter for both parent
1750 * and child table at the same time as it is not very
1751 * clear which one should be given preference.
1752 */
1753 if (childrelid != myrelid &&
1754 (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1755 ereport(ERROR,
1757 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1759
1760 /*
1761 * We don't allow to specify column list for both parent
1762 * and child table at the same time as it is not very
1763 * clear which one should be given preference.
1764 */
1765 if (childrelid != myrelid &&
1766 (t->columns || list_member_oid(relids_with_collist, childrelid)))
1767 ereport(ERROR,
1769 errmsg("conflicting or redundant column lists for table \"%s\"",
1771
1772 continue;
1773 }
1774
1775 /* find_all_inheritors already got lock */
1776 rel = table_open(childrelid, NoLock);
1777 pub_rel = palloc(sizeof(PublicationRelInfo));
1778 pub_rel->relation = rel;
1779 /* child inherits WHERE clause from parent */
1780 pub_rel->whereClause = t->whereClause;
1781
1782 /* child inherits column list from parent */
1783 pub_rel->columns = t->columns;
1784 rels = lappend(rels, pub_rel);
1785 relids = lappend_oid(relids, childrelid);
1786
1787 if (t->whereClause)
1788 relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1789
1790 if (t->columns)
1791 relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1792 }
1793 }
1794 }
1795
1796 list_free(relids);
1797 list_free(relids_with_rf);
1798
1799 return rels;
1800}
1801
1802/*
1803 * Close all relations in the list.
1804 */
1805static void
1807{
1808 ListCell *lc;
1809
1810 foreach(lc, rels)
1811 {
1812 PublicationRelInfo *pub_rel;
1813
1814 pub_rel = (PublicationRelInfo *) lfirst(lc);
1815 table_close(pub_rel->relation, NoLock);
1816 }
1817
1818 list_free_deep(rels);
1819}
1820
1821/*
1822 * Lock the schemas specified in the schema list in AccessShareLock mode in
1823 * order to prevent concurrent schema deletion.
1824 */
1825static void
1827{
1828 ListCell *lc;
1829
1830 foreach(lc, schemalist)
1831 {
1832 Oid schemaid = lfirst_oid(lc);
1833
1834 /* Allow query cancel in case this takes a long time */
1836 LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1837
1838 /*
1839 * It is possible that by the time we acquire the lock on schema,
1840 * concurrent DDL has removed it. We can test this by checking the
1841 * existence of schema.
1842 */
1843 if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1844 ereport(ERROR,
1845 errcode(ERRCODE_UNDEFINED_SCHEMA),
1846 errmsg("schema with OID %u does not exist", schemaid));
1847 }
1848}
1849
1850/*
1851 * Add listed tables to the publication.
1852 */
1853static void
1854PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1856{
1857 ListCell *lc;
1858
1859 Assert(!stmt || !stmt->for_all_tables);
1860
1861 foreach(lc, rels)
1862 {
1863 PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1864 Relation rel = pub_rel->relation;
1865 ObjectAddress obj;
1866
1867 /* Must be owner of the table or superuser. */
1868 if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1871
1872 obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1873 if (stmt)
1874 {
1876 (Node *) stmt);
1877
1878 InvokeObjectPostCreateHook(PublicationRelRelationId,
1879 obj.objectId, 0);
1880 }
1881 }
1882}
1883
1884/*
1885 * Remove listed tables from the publication.
1886 */
1887static void
1888PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1889{
1890 ObjectAddress obj;
1891 ListCell *lc;
1892 Oid prid;
1893
1894 foreach(lc, rels)
1895 {
1897 Relation rel = pubrel->relation;
1898 Oid relid = RelationGetRelid(rel);
1899
1900 if (pubrel->columns)
1901 ereport(ERROR,
1902 errcode(ERRCODE_SYNTAX_ERROR),
1903 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1904
1905 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1906 ObjectIdGetDatum(relid),
1907 ObjectIdGetDatum(pubid));
1908 if (!OidIsValid(prid))
1909 {
1910 if (missing_ok)
1911 continue;
1912
1913 ereport(ERROR,
1914 (errcode(ERRCODE_UNDEFINED_OBJECT),
1915 errmsg("relation \"%s\" is not part of the publication",
1917 }
1918
1919 if (pubrel->whereClause)
1920 ereport(ERROR,
1921 (errcode(ERRCODE_SYNTAX_ERROR),
1922 errmsg("cannot use a WHERE clause when removing a table from a publication")));
1923
1924 ObjectAddressSet(obj, PublicationRelRelationId, prid);
1925 performDeletion(&obj, DROP_CASCADE, 0);
1926 }
1927}
1928
1929/*
1930 * Add listed schemas to the publication.
1931 */
1932static void
1933PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1935{
1936 ListCell *lc;
1937
1938 Assert(!stmt || !stmt->for_all_tables);
1939
1940 foreach(lc, schemas)
1941 {
1942 Oid schemaid = lfirst_oid(lc);
1943 ObjectAddress obj;
1944
1945 obj = publication_add_schema(pubid, schemaid, if_not_exists);
1946 if (stmt)
1947 {
1949 (Node *) stmt);
1950
1951 InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1952 obj.objectId, 0);
1953 }
1954 }
1955}
1956
1957/*
1958 * Remove listed schemas from the publication.
1959 */
1960static void
1961PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1962{
1963 ObjectAddress obj;
1964 ListCell *lc;
1965 Oid psid;
1966
1967 foreach(lc, schemas)
1968 {
1969 Oid schemaid = lfirst_oid(lc);
1970
1971 psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1972 Anum_pg_publication_namespace_oid,
1973 ObjectIdGetDatum(schemaid),
1974 ObjectIdGetDatum(pubid));
1975 if (!OidIsValid(psid))
1976 {
1977 if (missing_ok)
1978 continue;
1979
1980 ereport(ERROR,
1981 (errcode(ERRCODE_UNDEFINED_OBJECT),
1982 errmsg("tables from schema \"%s\" are not part of the publication",
1983 get_namespace_name(schemaid))));
1984 }
1985
1986 ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1987 performDeletion(&obj, DROP_CASCADE, 0);
1988 }
1989}
1990
1991/*
1992 * Internal workhorse for changing a publication owner
1993 */
1994static void
1996{
1998
1999 form = (Form_pg_publication) GETSTRUCT(tup);
2000
2001 if (form->pubowner == newOwnerId)
2002 return;
2003
2004 if (!superuser())
2005 {
2006 AclResult aclresult;
2007
2008 /* Must be owner */
2009 if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2011 NameStr(form->pubname));
2012
2013 /* Must be able to become new owner */
2014 check_can_set_role(GetUserId(), newOwnerId);
2015
2016 /* New owner must have CREATE privilege on database */
2017 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2018 if (aclresult != ACLCHECK_OK)
2021
2022 if (form->puballtables && !superuser_arg(newOwnerId))
2023 ereport(ERROR,
2024 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2025 errmsg("permission denied to change owner of publication \"%s\"",
2026 NameStr(form->pubname)),
2027 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
2028
2029 if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
2030 ereport(ERROR,
2031 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2032 errmsg("permission denied to change owner of publication \"%s\"",
2033 NameStr(form->pubname)),
2034 errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
2035 }
2036
2037 form->pubowner = newOwnerId;
2038 CatalogTupleUpdate(rel, &tup->t_self, tup);
2039
2040 /* Update owner dependency reference */
2041 changeDependencyOnOwner(PublicationRelationId,
2042 form->oid,
2043 newOwnerId);
2044
2045 InvokeObjectPostAlterHook(PublicationRelationId,
2046 form->oid, 0);
2047}
2048
2049/*
2050 * Change publication owner -- by name
2051 */
2053AlterPublicationOwner(const char *name, Oid newOwnerId)
2054{
2055 Oid pubid;
2056 HeapTuple tup;
2057 Relation rel;
2058 ObjectAddress address;
2059 Form_pg_publication pubform;
2060
2061 rel = table_open(PublicationRelationId, RowExclusiveLock);
2062
2063 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2064
2065 if (!HeapTupleIsValid(tup))
2066 ereport(ERROR,
2067 (errcode(ERRCODE_UNDEFINED_OBJECT),
2068 errmsg("publication \"%s\" does not exist", name)));
2069
2070 pubform = (Form_pg_publication) GETSTRUCT(tup);
2071 pubid = pubform->oid;
2072
2073 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2074
2075 ObjectAddressSet(address, PublicationRelationId, pubid);
2076
2077 heap_freetuple(tup);
2078
2080
2081 return address;
2082}
2083
2084/*
2085 * Change publication owner -- by OID
2086 */
2087void
2089{
2090 HeapTuple tup;
2091 Relation rel;
2092
2093 rel = table_open(PublicationRelationId, RowExclusiveLock);
2094
2095 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2096
2097 if (!HeapTupleIsValid(tup))
2098 ereport(ERROR,
2099 (errcode(ERRCODE_UNDEFINED_OBJECT),
2100 errmsg("publication with OID %u does not exist", pubid)));
2101
2102 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2103
2104 heap_freetuple(tup);
2105
2107}
2108
2109/*
2110 * Extract the publish_generated_columns option value from a DefElem. "stored"
2111 * and "none" values are accepted.
2112 */
2113static char
2115{
2116 char *sval;
2117
2118 /*
2119 * If no parameter value given, assume "stored" is meant.
2120 */
2121 if (!def->arg)
2122 return PUBLISH_GENCOLS_STORED;
2123
2124 sval = defGetString(def);
2125
2126 if (pg_strcasecmp(sval, "none") == 0)
2127 return PUBLISH_GENCOLS_NONE;
2128 if (pg_strcasecmp(sval, "stored") == 0)
2129 return PUBLISH_GENCOLS_STORED;
2130
2131 ereport(ERROR,
2132 errcode(ERRCODE_SYNTAX_ERROR),
2133 errmsg("%s requires a \"none\" or \"stored\" value",
2134 def->defname));
2135
2136 return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2137}
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5325
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2639
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3821
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4075
int16 AttrNumber
Definition: attnum.h:21
#define InvalidAttrNumber
Definition: attnum.h:23
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1306
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:717
#define OidIsValid(objectId)
Definition: c.h:746
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:450
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3188
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:371
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:273
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1231
int errdetail(const char *fmt,...)
Definition: elog.c:1204
int errhint(const char *fmt,...)
Definition: elog.c:1318
int errcode(int sqlerrcode)
Definition: elog.c:854
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define _(x)
Definition: elog.c:91
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:682
Oid MyDatabaseId
Definition: globals.c:95
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition: heaptuple.c:456
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void CacheInvalidateRelSyncAll(void)
Definition: inval.c:1720
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1687
void CacheInvalidateRelSync(Oid relid)
Definition: inval.c:1708
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1654
int x
Definition: isn.c:75
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
List * list_concat_unique_oid(List *list1, const List *list2)
Definition: list.c:1469
List * lappend(List *list, void *datum)
Definition: list.c:339
List * list_difference_oid(const List *list1, const List *list2)
Definition: list.c:1313
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2068
AttrNumber get_attnum(Oid relid, const char *attname)
Definition: lsyscache.c:950
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2143
char func_volatile(Oid funcid)
Definition: lsyscache.c:1920
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition: lsyscache.c:919
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3506
void * palloc(Size size)
Definition: mcxt.c:1943
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
Oid GetUserId(void)
Definition: miscinit.c:520
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
List * fetch_search_path(bool includeImplicit)
Definition: namespace.c:4819
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Definition: namespace.c:3535
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:42
Oid exprInputCollation(const Node *expr)
Definition: nodeFuncs.c:1076
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Definition: nodeFuncs.c:1910
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:821
int exprLocation(const Node *expr)
Definition: nodeFuncs.c:1388
#define expression_tree_walker(n, w, c)
Definition: nodeFuncs.h:153
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
#define copyObject(obj)
Definition: nodes.h:230
#define nodeTag(nodeptr)
Definition: nodes.h:139
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
Definition: parse_node.c:72
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:106
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
@ EXPR_KIND_WHERE
Definition: parse_node.h:46
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
Definition: parsenodes.h:4242
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
Definition: parsenodes.h:4241
@ PUBLICATIONOBJ_TABLE
Definition: parsenodes.h:4240
@ AP_DropObjects
Definition: parsenodes.h:4268
@ AP_SetObjects
Definition: parsenodes.h:4269
@ AP_AddObjects
Definition: parsenodes.h:4267
@ DROP_CASCADE
Definition: parsenodes.h:2391
@ OBJECT_DATABASE
Definition: parsenodes.h:2326
@ OBJECT_PUBLICATION
Definition: parsenodes.h:2347
#define ACL_CREATE
Definition: parsenodes.h:85
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
NameData relname
Definition: pg_class.h:38
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_node(type, lc)
Definition: pg_list.h:176
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_oid(var, lst)
Definition: pg_list.h:471
#define linitial_oid(l)
Definition: pg_list.h:180
#define lfirst_oid(lc)
Definition: pg_list.h:174
Bitmapset * pub_collist_validate(Relation targetrel, List *columns)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
bool is_schema_publication(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Publication * GetPublication(Oid pubid)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:316
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
uintptr_t Datum
Definition: postgres.h:69
static Datum BoolGetDatum(bool X)
Definition: postgres.h:107
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:257
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:355
static Datum CharGetDatum(char X)
Definition: postgres.h:127
unsigned int Oid
Definition: postgres_ext.h:30
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
void InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
static char defGetGeneratedColsOption(DefElem *def)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root, bool *publish_generated_columns_given, char *publish_generated_columns)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:516
#define RelationGetDescr(relation)
Definition: rel.h:542
#define RelationGetRelationName(relation)
Definition: rel.h:550
#define RelationGetNamespace(relation)
Definition: rel.h:557
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:5300
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition: relcache.h:71
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
char * defname
Definition: parsenodes.h:826
Node * arg
Definition: parsenodes.h:827
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
Definition: nodes.h:135
const char * p_sourcetext
Definition: parse_node.h:209
PublicationObjSpecType pubobjtype
Definition: parsenodes.h:4250
PublicationTable * pubtable
Definition: parsenodes.h:4252
RangeVar * relation
Definition: parsenodes.h:4230
bool inh
Definition: primnodes.h:86
TupleDesc rd_att
Definition: rel.h:112
Form_pg_class rd_rel
Definition: rel.h:111
bool has_generated_virtual
Definition: tupdesc.h:47
bool has_generated_stored
Definition: tupdesc.h:46
TupleConstr * constr
Definition: tupdesc.h:141
Definition: primnodes.h:262
AttrNumber varattno
Definition: primnodes.h:274
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
bool superuser(void)
Definition: superuser.c:46
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:600
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:232
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
#define SearchSysCacheExists1(cacheId, key1)
Definition: syscache.h:100
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:109
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:83
#define FirstNormalObjectId
Definition: transam.h:197
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3525
const char * name
void CommandCounterIncrement(void)
Definition: xact.c:1100
int wal_level
Definition: xlog.c:131
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76