PostgreSQL Source Code git master
publicationcmds.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * publicationcmds.c
4 * publication manipulation
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/publicationcmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/htup_details.h"
18#include "access/table.h"
19#include "access/xact.h"
20#include "catalog/catalog.h"
21#include "catalog/indexing.h"
22#include "catalog/namespace.h"
25#include "catalog/pg_database.h"
26#include "catalog/pg_inherits.h"
28#include "catalog/pg_proc.h"
32#include "commands/defrem.h"
35#include "miscadmin.h"
36#include "nodes/nodeFuncs.h"
37#include "parser/parse_clause.h"
41#include "storage/lmgr.h"
42#include "utils/acl.h"
43#include "utils/builtins.h"
44#include "utils/inval.h"
45#include "utils/lsyscache.h"
46#include "utils/rel.h"
47#include "utils/syscache.h"
48#include "utils/varlena.h"
49
50
51/*
52 * Information used to validate the columns in the row filter expression. See
53 * contain_invalid_rfcolumn_walker for details.
54 */
55typedef struct rf_context
56{
57 Bitmapset *bms_replident; /* bitset of replica identity columns */
58 bool pubviaroot; /* true if we are validating the parent
59 * relation's row filter */
60 Oid relid; /* relid of the relation */
61 Oid parentid; /* relid of the parent relation */
63
64static List *OpenTableList(List *tables);
65static void CloseTableList(List *rels);
66static void LockSchemaList(List *schemalist);
67static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
69static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
72static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73static char defGetGeneratedColsOption(DefElem *def);
74
75
76static void
79 bool *publish_given,
80 PublicationActions *pubactions,
81 bool *publish_via_partition_root_given,
82 bool *publish_via_partition_root,
83 bool *publish_generated_columns_given,
84 char *publish_generated_columns)
85{
86 ListCell *lc;
87
88 *publish_given = false;
89 *publish_via_partition_root_given = false;
90 *publish_generated_columns_given = false;
91
92 /* defaults */
93 pubactions->pubinsert = true;
94 pubactions->pubupdate = true;
95 pubactions->pubdelete = true;
96 pubactions->pubtruncate = true;
97 *publish_via_partition_root = false;
98 *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99
100 /* Parse options */
101 foreach(lc, options)
102 {
103 DefElem *defel = (DefElem *) lfirst(lc);
104
105 if (strcmp(defel->defname, "publish") == 0)
106 {
107 char *publish;
108 List *publish_list;
109 ListCell *lc2;
110
111 if (*publish_given)
112 errorConflictingDefElem(defel, pstate);
113
114 /*
115 * If publish option was given only the explicitly listed actions
116 * should be published.
117 */
118 pubactions->pubinsert = false;
119 pubactions->pubupdate = false;
120 pubactions->pubdelete = false;
121 pubactions->pubtruncate = false;
122
123 *publish_given = true;
124
125 /*
126 * SplitIdentifierString destructively modifies its input, so make
127 * a copy so we don't modify the memory of the executing statement
128 */
129 publish = pstrdup(defGetString(defel));
130
131 if (!SplitIdentifierString(publish, ',', &publish_list))
133 (errcode(ERRCODE_SYNTAX_ERROR),
134 errmsg("invalid list syntax in parameter \"%s\"",
135 "publish")));
136
137 /* Process the option list. */
138 foreach(lc2, publish_list)
139 {
140 char *publish_opt = (char *) lfirst(lc2);
141
142 if (strcmp(publish_opt, "insert") == 0)
143 pubactions->pubinsert = true;
144 else if (strcmp(publish_opt, "update") == 0)
145 pubactions->pubupdate = true;
146 else if (strcmp(publish_opt, "delete") == 0)
147 pubactions->pubdelete = true;
148 else if (strcmp(publish_opt, "truncate") == 0)
149 pubactions->pubtruncate = true;
150 else
152 (errcode(ERRCODE_SYNTAX_ERROR),
153 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
154 "publish", publish_opt)));
155 }
156 }
157 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
158 {
159 if (*publish_via_partition_root_given)
160 errorConflictingDefElem(defel, pstate);
161 *publish_via_partition_root_given = true;
162 *publish_via_partition_root = defGetBoolean(defel);
163 }
164 else if (strcmp(defel->defname, "publish_generated_columns") == 0)
165 {
166 if (*publish_generated_columns_given)
167 errorConflictingDefElem(defel, pstate);
168 *publish_generated_columns_given = true;
169 *publish_generated_columns = defGetGeneratedColsOption(defel);
170 }
171 else
173 (errcode(ERRCODE_SYNTAX_ERROR),
174 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 }
176}
177
178/*
179 * Convert the PublicationObjSpecType list into schema oid list and
180 * PublicationTable list.
181 */
182static void
184 List **rels, List **schemas)
185{
186 ListCell *cell;
187 PublicationObjSpec *pubobj;
188
189 if (!pubobjspec_list)
190 return;
191
192 foreach(cell, pubobjspec_list)
193 {
194 Oid schemaid;
195 List *search_path;
196
197 pubobj = (PublicationObjSpec *) lfirst(cell);
198
199 switch (pubobj->pubobjtype)
200 {
202 *rels = lappend(*rels, pubobj->pubtable);
203 break;
205 schemaid = get_namespace_oid(pubobj->name, false);
206
207 /* Filter out duplicates if user specifies "sch1, sch1" */
208 *schemas = list_append_unique_oid(*schemas, schemaid);
209 break;
211 search_path = fetch_search_path(false);
212 if (search_path == NIL) /* nothing valid in search_path? */
214 errcode(ERRCODE_UNDEFINED_SCHEMA),
215 errmsg("no schema has been selected for CURRENT_SCHEMA"));
216
217 schemaid = linitial_oid(search_path);
218 list_free(search_path);
219
220 /* Filter out duplicates if user specifies "sch1, sch1" */
221 *schemas = list_append_unique_oid(*schemas, schemaid);
222 break;
223 default:
224 /* shouldn't happen */
225 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
226 break;
227 }
228 }
229}
230
231/*
232 * Returns true if any of the columns used in the row filter WHERE expression is
233 * not part of REPLICA IDENTITY, false otherwise.
234 */
235static bool
237{
238 if (node == NULL)
239 return false;
240
241 if (IsA(node, Var))
242 {
243 Var *var = (Var *) node;
245
246 /*
247 * If pubviaroot is true, we are validating the row filter of the
248 * parent table, but the bitmap contains the replica identity
249 * information of the child table. So, get the column number of the
250 * child table as parent and child column order could be different.
251 */
252 if (context->pubviaroot)
253 {
254 char *colname = get_attname(context->parentid, attnum, false);
255
256 attnum = get_attnum(context->relid, colname);
257 }
258
260 context->bms_replident))
261 return true;
262 }
263
265 context);
266}
267
268/*
269 * Check if all columns referenced in the filter expression are part of the
270 * REPLICA IDENTITY index or not.
271 *
272 * Returns true if any invalid column is found.
273 */
274bool
276 bool pubviaroot)
277{
278 HeapTuple rftuple;
279 Oid relid = RelationGetRelid(relation);
280 Oid publish_as_relid = RelationGetRelid(relation);
281 bool result = false;
282 Datum rfdatum;
283 bool rfisnull;
284
285 /*
286 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
287 * allowed in the row filter and we can skip the validation.
288 */
289 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
290 return false;
291
292 /*
293 * For a partition, if pubviaroot is true, find the topmost ancestor that
294 * is published via this publication as we need to use its row filter
295 * expression to filter the partition's changes.
296 *
297 * Note that even though the row filter used is for an ancestor, the
298 * REPLICA IDENTITY used will be for the actual child table.
299 */
300 if (pubviaroot && relation->rd_rel->relispartition)
301 {
302 publish_as_relid
303 = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
304
305 if (!OidIsValid(publish_as_relid))
306 publish_as_relid = relid;
307 }
308
309 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
310 ObjectIdGetDatum(publish_as_relid),
311 ObjectIdGetDatum(pubid));
312
313 if (!HeapTupleIsValid(rftuple))
314 return false;
315
316 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
317 Anum_pg_publication_rel_prqual,
318 &rfisnull);
319
320 if (!rfisnull)
321 {
322 rf_context context = {0};
323 Node *rfnode;
324 Bitmapset *bms = NULL;
325
326 context.pubviaroot = pubviaroot;
327 context.parentid = publish_as_relid;
328 context.relid = relid;
329
330 /* Remember columns that are part of the REPLICA IDENTITY */
331 bms = RelationGetIndexAttrBitmap(relation,
333
334 context.bms_replident = bms;
335 rfnode = stringToNode(TextDatumGetCString(rfdatum));
336 result = contain_invalid_rfcolumn_walker(rfnode, &context);
337 }
338
339 ReleaseSysCache(rftuple);
340
341 return result;
342}
343
344/*
345 * Check for invalid columns in the publication table definition.
346 *
347 * This function evaluates two conditions:
348 *
349 * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
350 * by the column list. If any column is missing, *invalid_column_list is set
351 * to true.
352 * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
353 * are published, either by being explicitly named in the column list or, if
354 * no column list is specified, by setting the option
355 * publish_generated_columns to stored. If any unpublished
356 * generated column is found, *invalid_gen_col is set to true.
357 *
358 * Returns true if any of the above conditions are not met.
359 */
360bool
361pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
362 bool pubviaroot, char pubgencols_type,
363 bool *invalid_column_list,
364 bool *invalid_gen_col)
365{
366 Oid relid = RelationGetRelid(relation);
367 Oid publish_as_relid = RelationGetRelid(relation);
368 Bitmapset *idattrs;
369 Bitmapset *columns = NULL;
370 TupleDesc desc = RelationGetDescr(relation);
371 Publication *pub;
372 int x;
373
374 *invalid_column_list = false;
375 *invalid_gen_col = false;
376
377 /*
378 * For a partition, if pubviaroot is true, find the topmost ancestor that
379 * is published via this publication as we need to use its column list for
380 * the changes.
381 *
382 * Note that even though the column list used is for an ancestor, the
383 * REPLICA IDENTITY used will be for the actual child table.
384 */
385 if (pubviaroot && relation->rd_rel->relispartition)
386 {
387 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
388
389 if (!OidIsValid(publish_as_relid))
390 publish_as_relid = relid;
391 }
392
393 /* Fetch the column list */
394 pub = GetPublication(pubid);
395 check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
396
397 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
398 {
399 /* With REPLICA IDENTITY FULL, no column list is allowed. */
400 *invalid_column_list = (columns != NULL);
401
402 /*
403 * As we don't allow a column list with REPLICA IDENTITY FULL, the
404 * publish_generated_columns option must be set to stored if the table
405 * has any stored generated columns.
406 */
407 if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
408 relation->rd_att->constr &&
410 *invalid_gen_col = true;
411
412 /*
413 * Virtual generated columns are currently not supported for logical
414 * replication at all.
415 */
416 if (relation->rd_att->constr &&
418 *invalid_gen_col = true;
419
420 if (*invalid_gen_col && *invalid_column_list)
421 return true;
422 }
423
424 /* Remember columns that are part of the REPLICA IDENTITY */
425 idattrs = RelationGetIndexAttrBitmap(relation,
427
428 /*
429 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
430 * (to handle system columns the usual way), while column list does not
431 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
432 * over the idattrs and check all of them are in the list.
433 */
434 x = -1;
435 while ((x = bms_next_member(idattrs, x)) >= 0)
436 {
438 Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
439
440 if (columns == NULL)
441 {
442 /*
443 * The publish_generated_columns option must be set to stored if
444 * the REPLICA IDENTITY contains any stored generated column.
445 */
446 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
447 {
448 *invalid_gen_col = true;
449 break;
450 }
451
452 /*
453 * The equivalent setting for virtual generated columns does not
454 * exist yet.
455 */
456 if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
457 {
458 *invalid_gen_col = true;
459 break;
460 }
461
462 /* Skip validating the column list since it is not defined */
463 continue;
464 }
465
466 /*
467 * If pubviaroot is true, we are validating the column list of the
468 * parent table, but the bitmap contains the replica identity
469 * information of the child table. The parent/child attnums may not
470 * match, so translate them to the parent - get the attname from the
471 * child, and look it up in the parent.
472 */
473 if (pubviaroot)
474 {
475 /* attribute name in the child table */
476 char *colname = get_attname(relid, attnum, false);
477
478 /*
479 * Determine the attnum for the attribute name in parent (we are
480 * using the column list defined on the parent).
481 */
482 attnum = get_attnum(publish_as_relid, colname);
483 }
484
485 /* replica identity column, not covered by the column list */
486 *invalid_column_list |= !bms_is_member(attnum, columns);
487
488 if (*invalid_column_list && *invalid_gen_col)
489 break;
490 }
491
492 bms_free(columns);
493 bms_free(idattrs);
494
495 return *invalid_column_list || *invalid_gen_col;
496}
497
498/*
499 * Invalidate entries in the RelationSyncCache for relations included in the
500 * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
501 *
502 * If 'puballtables' is true, invalidate all cache entries.
503 */
504void
505InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
506{
507 if (puballtables)
508 {
510 }
511 else
512 {
513 List *relids = NIL;
514 List *schemarelids = NIL;
515
516 /*
517 * For partitioned tables, we must invalidate all partitions and
518 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
519 * a target. However, WAL records for TRUNCATE specify both a root and
520 * its leaves.
521 */
522 relids = GetPublicationRelations(pubid,
524 schemarelids = GetAllSchemaPublicationRelations(pubid,
526
527 relids = list_concat_unique_oid(relids, schemarelids);
528
529 /* Invalidate the relsyncache */
530 foreach_oid(relid, relids)
532 }
533
534 return;
535}
536
537/* check_functions_in_node callback */
538static bool
540{
541 return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
542 func_id >= FirstNormalObjectId);
543}
544
545/*
546 * The row filter walker checks if the row filter expression is a "simple
547 * expression".
548 *
549 * It allows only simple or compound expressions such as:
550 * - (Var Op Const)
551 * - (Var Op Var)
552 * - (Var Op Const) AND/OR (Var Op Const)
553 * - etc
554 * (where Var is a column of the table this filter belongs to)
555 *
556 * The simple expression has the following restrictions:
557 * - User-defined operators are not allowed;
558 * - User-defined functions are not allowed;
559 * - User-defined types are not allowed;
560 * - User-defined collations are not allowed;
561 * - Non-immutable built-in functions are not allowed;
562 * - System columns are not allowed.
563 *
564 * NOTES
565 *
566 * We don't allow user-defined functions/operators/types/collations because
567 * (a) if a user drops a user-defined object used in a row filter expression or
568 * if there is any other error while using it, the logical decoding
569 * infrastructure won't be able to recover from such an error even if the
570 * object is recreated again because a historic snapshot is used to evaluate
571 * the row filter;
572 * (b) a user-defined function can be used to access tables that could have
573 * unpleasant results because a historic snapshot is used. That's why only
574 * immutable built-in functions are allowed in row filter expressions.
575 *
576 * We don't allow system columns because currently, we don't have that
577 * information in the tuple passed to downstream. Also, as we don't replicate
578 * those to subscribers, there doesn't seem to be a need for a filter on those
579 * columns.
580 *
581 * We can allow other node types after more analysis and testing.
582 */
583static bool
585{
586 char *errdetail_msg = NULL;
587
588 if (node == NULL)
589 return false;
590
591 switch (nodeTag(node))
592 {
593 case T_Var:
594 /* System columns are not allowed. */
595 if (((Var *) node)->varattno < InvalidAttrNumber)
596 errdetail_msg = _("System columns are not allowed.");
597 break;
598 case T_OpExpr:
599 case T_DistinctExpr:
600 case T_NullIfExpr:
601 /* OK, except user-defined operators are not allowed. */
602 if (((OpExpr *) node)->opno >= FirstNormalObjectId)
603 errdetail_msg = _("User-defined operators are not allowed.");
604 break;
605 case T_ScalarArrayOpExpr:
606 /* OK, except user-defined operators are not allowed. */
607 if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
608 errdetail_msg = _("User-defined operators are not allowed.");
609
610 /*
611 * We don't need to check the hashfuncid and negfuncid of
612 * ScalarArrayOpExpr as those functions are only built for a
613 * subquery.
614 */
615 break;
616 case T_RowCompareExpr:
617 {
618 ListCell *opid;
619
620 /* OK, except user-defined operators are not allowed. */
621 foreach(opid, ((RowCompareExpr *) node)->opnos)
622 {
623 if (lfirst_oid(opid) >= FirstNormalObjectId)
624 {
625 errdetail_msg = _("User-defined operators are not allowed.");
626 break;
627 }
628 }
629 }
630 break;
631 case T_Const:
632 case T_FuncExpr:
633 case T_BoolExpr:
634 case T_RelabelType:
635 case T_CollateExpr:
636 case T_CaseExpr:
637 case T_CaseTestExpr:
638 case T_ArrayExpr:
639 case T_RowExpr:
640 case T_CoalesceExpr:
641 case T_MinMaxExpr:
642 case T_XmlExpr:
643 case T_NullTest:
644 case T_BooleanTest:
645 case T_List:
646 /* OK, supported */
647 break;
648 default:
649 errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
650 break;
651 }
652
653 /*
654 * For all the supported nodes, if we haven't already found a problem,
655 * check the types, functions, and collations used in it. We check List
656 * by walking through each element.
657 */
658 if (!errdetail_msg && !IsA(node, List))
659 {
660 if (exprType(node) >= FirstNormalObjectId)
661 errdetail_msg = _("User-defined types are not allowed.");
663 pstate))
664 errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
665 else if (exprCollation(node) >= FirstNormalObjectId ||
667 errdetail_msg = _("User-defined collations are not allowed.");
668 }
669
670 /*
671 * If we found a problem in this node, throw error now. Otherwise keep
672 * going.
673 */
674 if (errdetail_msg)
676 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
677 errmsg("invalid publication WHERE expression"),
678 errdetail_internal("%s", errdetail_msg),
679 parser_errposition(pstate, exprLocation(node))));
680
682 pstate);
683}
684
685/*
686 * Check if the row filter expression is a "simple expression".
687 *
688 * See check_simple_rowfilter_expr_walker for details.
689 */
690static bool
692{
693 return check_simple_rowfilter_expr_walker(node, pstate);
694}
695
696/*
697 * Transform the publication WHERE expression for all the relations in the list,
698 * ensuring it is coerced to boolean and necessary collation information is
699 * added if required, and add a new nsitem/RTE for the associated relation to
700 * the ParseState's namespace list.
701 *
702 * Also check the publication row filter expression and throw an error if
703 * anything not permitted or unexpected is encountered.
704 */
705static void
706TransformPubWhereClauses(List *tables, const char *queryString,
707 bool pubviaroot)
708{
709 ListCell *lc;
710
711 foreach(lc, tables)
712 {
713 ParseNamespaceItem *nsitem;
714 Node *whereclause = NULL;
715 ParseState *pstate;
717
718 if (pri->whereClause == NULL)
719 continue;
720
721 /*
722 * If the publication doesn't publish changes via the root partitioned
723 * table, the partition's row filter will be used. So disallow using
724 * WHERE clause on partitioned table in this case.
725 */
726 if (!pubviaroot &&
727 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
729 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
730 errmsg("cannot use publication WHERE clause for relation \"%s\"",
732 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
733 "publish_via_partition_root")));
734
735 /*
736 * A fresh pstate is required so that we only have "this" table in its
737 * rangetable
738 */
739 pstate = make_parsestate(NULL);
740 pstate->p_sourcetext = queryString;
741 nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
742 AccessShareLock, NULL,
743 false, false);
744 addNSItemToQuery(pstate, nsitem, false, true, true);
745
746 whereclause = transformWhereClause(pstate,
749 "PUBLICATION WHERE");
750
751 /* Fix up collation information */
752 assign_expr_collations(pstate, whereclause);
753
754 whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
755
756 /*
757 * We allow only simple expressions in row filters. See
758 * check_simple_rowfilter_expr_walker.
759 */
760 check_simple_rowfilter_expr(whereclause, pstate);
761
762 free_parsestate(pstate);
763
764 pri->whereClause = whereclause;
765 }
766}
767
768
769/*
770 * Given a list of tables that are going to be added to a publication,
771 * verify that they fulfill the necessary preconditions, namely: no tables
772 * have a column list if any schema is published; and partitioned tables do
773 * not have column lists if publish_via_partition_root is not set.
774 *
775 * 'publish_schema' indicates that the publication contains any TABLES IN
776 * SCHEMA elements (newly added in this command, or preexisting).
777 * 'pubviaroot' is the value of publish_via_partition_root.
778 */
779static void
780CheckPubRelationColumnList(char *pubname, List *tables,
781 bool publish_schema, bool pubviaroot)
782{
783 ListCell *lc;
784
785 foreach(lc, tables)
786 {
788
789 if (pri->columns == NIL)
790 continue;
791
792 /*
793 * Disallow specifying column list if any schema is in the
794 * publication.
795 *
796 * XXX We could instead just forbid the case when the publication
797 * tries to publish the table with a column list and a schema for that
798 * table. However, if we do that then we need a restriction during
799 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
800 * seem to be a good idea.
801 */
802 if (publish_schema)
804 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
805 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
807 RelationGetRelationName(pri->relation), pubname),
808 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
809
810 /*
811 * If the publication doesn't publish changes via the root partitioned
812 * table, the partition's column list will be used. So disallow using
813 * a column list on the partitioned table in this case.
814 */
815 if (!pubviaroot &&
816 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
818 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
819 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
821 RelationGetRelationName(pri->relation), pubname),
822 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
823 "publish_via_partition_root")));
824 }
825}
826
827/*
828 * Create new publication.
829 */
832{
833 Relation rel;
834 ObjectAddress myself;
835 Oid puboid;
836 bool nulls[Natts_pg_publication];
837 Datum values[Natts_pg_publication];
838 HeapTuple tup;
839 bool publish_given;
840 PublicationActions pubactions;
841 bool publish_via_partition_root_given;
842 bool publish_via_partition_root;
843 bool publish_generated_columns_given;
844 char publish_generated_columns;
845 AclResult aclresult;
846 List *relations = NIL;
847 List *schemaidlist = NIL;
848
849 /* must have CREATE privilege on database */
850 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
851 if (aclresult != ACLCHECK_OK)
854
855 /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
856 if (!superuser())
857 {
858 if (stmt->for_all_tables || stmt->for_all_sequences)
860 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
861 errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
862 }
863
864 rel = table_open(PublicationRelationId, RowExclusiveLock);
865
866 /* Check if name is used */
867 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
868 CStringGetDatum(stmt->pubname));
869 if (OidIsValid(puboid))
872 errmsg("publication \"%s\" already exists",
873 stmt->pubname)));
874
875 /* Form a tuple. */
876 memset(values, 0, sizeof(values));
877 memset(nulls, false, sizeof(nulls));
878
879 values[Anum_pg_publication_pubname - 1] =
881 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
882
884 stmt->options,
885 &publish_given, &pubactions,
886 &publish_via_partition_root_given,
887 &publish_via_partition_root,
888 &publish_generated_columns_given,
889 &publish_generated_columns);
890
891 if (stmt->for_all_sequences &&
892 (publish_given || publish_via_partition_root_given ||
893 publish_generated_columns_given))
895 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
897
898 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
899 Anum_pg_publication_oid);
900 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
901 values[Anum_pg_publication_puballtables - 1] =
902 BoolGetDatum(stmt->for_all_tables);
903 values[Anum_pg_publication_puballsequences - 1] =
904 BoolGetDatum(stmt->for_all_sequences);
905 values[Anum_pg_publication_pubinsert - 1] =
906 BoolGetDatum(pubactions.pubinsert);
907 values[Anum_pg_publication_pubupdate - 1] =
908 BoolGetDatum(pubactions.pubupdate);
909 values[Anum_pg_publication_pubdelete - 1] =
910 BoolGetDatum(pubactions.pubdelete);
911 values[Anum_pg_publication_pubtruncate - 1] =
912 BoolGetDatum(pubactions.pubtruncate);
913 values[Anum_pg_publication_pubviaroot - 1] =
914 BoolGetDatum(publish_via_partition_root);
915 values[Anum_pg_publication_pubgencols - 1] =
916 CharGetDatum(publish_generated_columns);
917
918 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
919
920 /* Insert tuple into catalog. */
921 CatalogTupleInsert(rel, tup);
922 heap_freetuple(tup);
923
924 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
925
926 ObjectAddressSet(myself, PublicationRelationId, puboid);
927
928 /* Make the changes visible. */
930
931 /* Associate objects with the publication. */
932 if (stmt->for_all_tables)
933 {
934 /*
935 * Invalidate relcache so that publication info is rebuilt. Sequences
936 * publication doesn't require invalidation, as replica identity
937 * checks don't apply to them.
938 */
940 }
941 else if (!stmt->for_all_sequences)
942 {
943 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
944 &schemaidlist);
945
946 /* FOR TABLES IN SCHEMA requires superuser */
947 if (schemaidlist != NIL && !superuser())
949 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
950 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
951
952 if (relations != NIL)
953 {
954 List *rels;
955
956 rels = OpenTableList(relations);
958 publish_via_partition_root);
959
960 CheckPubRelationColumnList(stmt->pubname, rels,
961 schemaidlist != NIL,
962 publish_via_partition_root);
963
964 PublicationAddTables(puboid, rels, true, NULL);
965 CloseTableList(rels);
966 }
967
968 if (schemaidlist != NIL)
969 {
970 /*
971 * Schema lock is held until the publication is created to prevent
972 * concurrent schema deletion.
973 */
974 LockSchemaList(schemaidlist);
975 PublicationAddSchemas(puboid, schemaidlist, true, NULL);
976 }
977 }
978
980
981 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
982
983 /*
984 * We don't need this warning message when wal_level >= 'replica' since
985 * logical decoding is automatically enabled up on a logical slot
986 * creation.
987 */
990 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
991 errmsg("logical decoding must be enabled to publish logical changes"),
992 errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
993
994 return myself;
995}
996
997/*
998 * Change options of a publication.
999 */
1000static void
1002 Relation rel, HeapTuple tup)
1003{
1004 bool nulls[Natts_pg_publication];
1005 bool replaces[Natts_pg_publication];
1006 Datum values[Natts_pg_publication];
1007 bool publish_given;
1008 PublicationActions pubactions;
1009 bool publish_via_partition_root_given;
1010 bool publish_via_partition_root;
1011 bool publish_generated_columns_given;
1012 char publish_generated_columns;
1013 ObjectAddress obj;
1014 Form_pg_publication pubform;
1015 List *root_relids = NIL;
1016 ListCell *lc;
1017
1018 pubform = (Form_pg_publication) GETSTRUCT(tup);
1019
1021 stmt->options,
1022 &publish_given, &pubactions,
1023 &publish_via_partition_root_given,
1024 &publish_via_partition_root,
1025 &publish_generated_columns_given,
1026 &publish_generated_columns);
1027
1028 if (pubform->puballsequences &&
1029 (publish_given || publish_via_partition_root_given ||
1030 publish_generated_columns_given))
1032 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1033 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1034
1035 /*
1036 * If the publication doesn't publish changes via the root partitioned
1037 * table, the partition's row filter and column list will be used. So
1038 * disallow using WHERE clause and column lists on partitioned table in
1039 * this case.
1040 */
1041 if (!pubform->puballtables && publish_via_partition_root_given &&
1042 !publish_via_partition_root)
1043 {
1044 /*
1045 * Lock the publication so nobody else can do anything with it. This
1046 * prevents concurrent alter to add partitioned table(s) with WHERE
1047 * clause(s) and/or column lists which we don't allow when not
1048 * publishing via root.
1049 */
1050 LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1052
1053 root_relids = GetPublicationRelations(pubform->oid,
1055
1056 foreach(lc, root_relids)
1057 {
1058 Oid relid = lfirst_oid(lc);
1059 HeapTuple rftuple;
1060 char relkind;
1061 char *relname;
1062 bool has_rowfilter;
1063 bool has_collist;
1064
1065 /*
1066 * Beware: we don't have lock on the relations, so cope silently
1067 * with the cache lookups returning NULL.
1068 */
1069
1070 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1071 ObjectIdGetDatum(relid),
1072 ObjectIdGetDatum(pubform->oid));
1073 if (!HeapTupleIsValid(rftuple))
1074 continue;
1075 has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1076 has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1077 if (!has_rowfilter && !has_collist)
1078 {
1079 ReleaseSysCache(rftuple);
1080 continue;
1081 }
1082
1083 relkind = get_rel_relkind(relid);
1084 if (relkind != RELKIND_PARTITIONED_TABLE)
1085 {
1086 ReleaseSysCache(rftuple);
1087 continue;
1088 }
1089 relname = get_rel_name(relid);
1090 if (relname == NULL) /* table concurrently dropped */
1091 {
1092 ReleaseSysCache(rftuple);
1093 continue;
1094 }
1095
1096 if (has_rowfilter)
1097 ereport(ERROR,
1098 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1099 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1100 "publish_via_partition_root",
1101 stmt->pubname),
1102 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1103 relname, "publish_via_partition_root")));
1104 Assert(has_collist);
1105 ereport(ERROR,
1106 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1107 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1108 "publish_via_partition_root",
1109 stmt->pubname),
1110 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1111 relname, "publish_via_partition_root")));
1112 }
1113 }
1114
1115 /* Everything ok, form a new tuple. */
1116 memset(values, 0, sizeof(values));
1117 memset(nulls, false, sizeof(nulls));
1118 memset(replaces, false, sizeof(replaces));
1119
1120 if (publish_given)
1121 {
1122 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1123 replaces[Anum_pg_publication_pubinsert - 1] = true;
1124
1125 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1126 replaces[Anum_pg_publication_pubupdate - 1] = true;
1127
1128 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1129 replaces[Anum_pg_publication_pubdelete - 1] = true;
1130
1131 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1132 replaces[Anum_pg_publication_pubtruncate - 1] = true;
1133 }
1134
1135 if (publish_via_partition_root_given)
1136 {
1137 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1138 replaces[Anum_pg_publication_pubviaroot - 1] = true;
1139 }
1140
1141 if (publish_generated_columns_given)
1142 {
1143 values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1144 replaces[Anum_pg_publication_pubgencols - 1] = true;
1145 }
1146
1147 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1148 replaces);
1149
1150 /* Update the catalog. */
1151 CatalogTupleUpdate(rel, &tup->t_self, tup);
1152
1154
1155 pubform = (Form_pg_publication) GETSTRUCT(tup);
1156
1157 /* Invalidate the relcache. */
1158 if (pubform->puballtables)
1159 {
1161 }
1162 else
1163 {
1164 List *relids = NIL;
1165 List *schemarelids = NIL;
1166
1167 /*
1168 * For any partitioned tables contained in the publication, we must
1169 * invalidate all partitions contained in the respective partition
1170 * trees, not just those explicitly mentioned in the publication.
1171 */
1172 if (root_relids == NIL)
1173 relids = GetPublicationRelations(pubform->oid,
1175 else
1176 {
1177 /*
1178 * We already got tables explicitly mentioned in the publication.
1179 * Now get all partitions for the partitioned table in the list.
1180 */
1181 foreach(lc, root_relids)
1182 relids = GetPubPartitionOptionRelations(relids,
1184 lfirst_oid(lc));
1185 }
1186
1187 schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1189 relids = list_concat_unique_oid(relids, schemarelids);
1190
1192 }
1193
1194 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1196 (Node *) stmt);
1197
1198 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1199}
1200
1201/*
1202 * Invalidate the relations.
1203 */
1204void
1206{
1207 /*
1208 * We don't want to send too many individual messages, at some point it's
1209 * cheaper to just reset whole relcache.
1210 */
1212 {
1213 ListCell *lc;
1214
1215 foreach(lc, relids)
1217 }
1218 else
1220}
1221
1222/*
1223 * Add or remove table to/from publication.
1224 */
1225static void
1227 List *tables, const char *queryString,
1228 bool publish_schema)
1229{
1230 List *rels = NIL;
1232 Oid pubid = pubform->oid;
1233
1234 /*
1235 * Nothing to do if no objects, except in SET: for that it is quite
1236 * possible that user has not specified any tables in which case we need
1237 * to remove all the existing tables.
1238 */
1239 if (!tables && stmt->action != AP_SetObjects)
1240 return;
1241
1242 rels = OpenTableList(tables);
1243
1244 if (stmt->action == AP_AddObjects)
1245 {
1246 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1247
1248 publish_schema |= is_schema_publication(pubid);
1249
1250 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1251 pubform->pubviaroot);
1252
1253 PublicationAddTables(pubid, rels, false, stmt);
1254 }
1255 else if (stmt->action == AP_DropObjects)
1256 PublicationDropTables(pubid, rels, false);
1257 else /* AP_SetObjects */
1258 {
1259 List *oldrelids = GetPublicationRelations(pubid,
1261 List *delrels = NIL;
1262 ListCell *oldlc;
1263
1264 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1265
1266 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1267 pubform->pubviaroot);
1268
1269 /*
1270 * To recreate the relation list for the publication, look for
1271 * existing relations that do not need to be dropped.
1272 */
1273 foreach(oldlc, oldrelids)
1274 {
1275 Oid oldrelid = lfirst_oid(oldlc);
1276 ListCell *newlc;
1277 PublicationRelInfo *oldrel;
1278 bool found = false;
1279 HeapTuple rftuple;
1280 Node *oldrelwhereclause = NULL;
1281 Bitmapset *oldcolumns = NULL;
1282
1283 /* look up the cache for the old relmap */
1284 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1285 ObjectIdGetDatum(oldrelid),
1286 ObjectIdGetDatum(pubid));
1287
1288 /*
1289 * See if the existing relation currently has a WHERE clause or a
1290 * column list. We need to compare those too.
1291 */
1292 if (HeapTupleIsValid(rftuple))
1293 {
1294 bool isnull = true;
1295 Datum whereClauseDatum;
1296 Datum columnListDatum;
1297
1298 /* Load the WHERE clause for this table. */
1299 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1300 Anum_pg_publication_rel_prqual,
1301 &isnull);
1302 if (!isnull)
1303 oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1304
1305 /* Transform the int2vector column list to a bitmap. */
1306 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1307 Anum_pg_publication_rel_prattrs,
1308 &isnull);
1309
1310 if (!isnull)
1311 oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1312
1313 ReleaseSysCache(rftuple);
1314 }
1315
1316 foreach(newlc, rels)
1317 {
1318 PublicationRelInfo *newpubrel;
1319 Oid newrelid;
1320 Bitmapset *newcolumns = NULL;
1321
1322 newpubrel = (PublicationRelInfo *) lfirst(newlc);
1323 newrelid = RelationGetRelid(newpubrel->relation);
1324
1325 /*
1326 * Validate the column list. If the column list or WHERE
1327 * clause changes, then the validation done here will be
1328 * duplicated inside PublicationAddTables(). The validation
1329 * is cheap enough that that seems harmless.
1330 */
1331 newcolumns = pub_collist_validate(newpubrel->relation,
1332 newpubrel->columns);
1333
1334 /*
1335 * Check if any of the new set of relations matches with the
1336 * existing relations in the publication. Additionally, if the
1337 * relation has an associated WHERE clause, check the WHERE
1338 * expressions also match. Same for the column list. Drop the
1339 * rest.
1340 */
1341 if (newrelid == oldrelid)
1342 {
1343 if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1344 bms_equal(oldcolumns, newcolumns))
1345 {
1346 found = true;
1347 break;
1348 }
1349 }
1350 }
1351
1352 /*
1353 * Add the non-matched relations to a list so that they can be
1354 * dropped.
1355 */
1356 if (!found)
1357 {
1359 oldrel->whereClause = NULL;
1360 oldrel->columns = NIL;
1361 oldrel->relation = table_open(oldrelid,
1363 delrels = lappend(delrels, oldrel);
1364 }
1365 }
1366
1367 /* And drop them. */
1368 PublicationDropTables(pubid, delrels, true);
1369
1370 /*
1371 * Don't bother calculating the difference for adding, we'll catch and
1372 * skip existing ones when doing catalog update.
1373 */
1374 PublicationAddTables(pubid, rels, true, stmt);
1375
1376 CloseTableList(delrels);
1377 }
1378
1379 CloseTableList(rels);
1380}
1381
1382/*
1383 * Alter the publication schemas.
1384 *
1385 * Add or remove schemas to/from publication.
1386 */
1387static void
1389 HeapTuple tup, List *schemaidlist)
1390{
1392
1393 /*
1394 * Nothing to do if no objects, except in SET: for that it is quite
1395 * possible that user has not specified any schemas in which case we need
1396 * to remove all the existing schemas.
1397 */
1398 if (!schemaidlist && stmt->action != AP_SetObjects)
1399 return;
1400
1401 /*
1402 * Schema lock is held until the publication is altered to prevent
1403 * concurrent schema deletion.
1404 */
1405 LockSchemaList(schemaidlist);
1406 if (stmt->action == AP_AddObjects)
1407 {
1408 ListCell *lc;
1409 List *reloids;
1410
1411 reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1412
1413 foreach(lc, reloids)
1414 {
1415 HeapTuple coltuple;
1416
1417 coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1419 ObjectIdGetDatum(pubform->oid));
1420
1421 if (!HeapTupleIsValid(coltuple))
1422 continue;
1423
1424 /*
1425 * Disallow adding schema if column list is already part of the
1426 * publication. See CheckPubRelationColumnList.
1427 */
1428 if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1429 ereport(ERROR,
1430 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1431 errmsg("cannot add schema to publication \"%s\"",
1432 stmt->pubname),
1433 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1434
1435 ReleaseSysCache(coltuple);
1436 }
1437
1438 PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1439 }
1440 else if (stmt->action == AP_DropObjects)
1441 PublicationDropSchemas(pubform->oid, schemaidlist, false);
1442 else /* AP_SetObjects */
1443 {
1444 List *oldschemaids = GetPublicationSchemas(pubform->oid);
1445 List *delschemas = NIL;
1446
1447 /* Identify which schemas should be dropped */
1448 delschemas = list_difference_oid(oldschemaids, schemaidlist);
1449
1450 /*
1451 * Schema lock is held until the publication is altered to prevent
1452 * concurrent schema deletion.
1453 */
1454 LockSchemaList(delschemas);
1455
1456 /* And drop them */
1457 PublicationDropSchemas(pubform->oid, delschemas, true);
1458
1459 /*
1460 * Don't bother calculating the difference for adding, we'll catch and
1461 * skip existing ones when doing catalog update.
1462 */
1463 PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1464 }
1465}
1466
1467/*
1468 * Check if relations and schemas can be in a given publication and throw
1469 * appropriate error if not.
1470 */
1471static void
1473 List *tables, List *schemaidlist)
1474{
1476
1477 if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1478 schemaidlist && !superuser())
1479 ereport(ERROR,
1480 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1481 errmsg("must be superuser to add or set schemas")));
1482
1483 /*
1484 * Check that user is allowed to manipulate the publication tables in
1485 * schema
1486 */
1487 if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
1488 {
1489 if (pubform->puballtables && pubform->puballsequences)
1490 ereport(ERROR,
1491 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1492 errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1493 NameStr(pubform->pubname)),
1494 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1495 else if (pubform->puballtables)
1496 ereport(ERROR,
1497 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1498 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1499 NameStr(pubform->pubname)),
1500 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1501 else
1502 ereport(ERROR,
1503 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1504 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1505 NameStr(pubform->pubname)),
1506 errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1507 }
1508
1509 /* Check that user is allowed to manipulate the publication tables. */
1510 if (tables && (pubform->puballtables || pubform->puballsequences))
1511 {
1512 if (pubform->puballtables && pubform->puballsequences)
1513 ereport(ERROR,
1514 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1515 errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1516 NameStr(pubform->pubname)),
1517 errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
1518 else if (pubform->puballtables)
1519 ereport(ERROR,
1520 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1521 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1522 NameStr(pubform->pubname)),
1523 errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1524 else
1525 ereport(ERROR,
1526 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1527 errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1528 NameStr(pubform->pubname)),
1529 errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1530 }
1531}
1532
1533/*
1534 * Alter the existing publication.
1535 *
1536 * This is dispatcher function for AlterPublicationOptions,
1537 * AlterPublicationSchemas and AlterPublicationTables.
1538 */
1539void
1541{
1542 Relation rel;
1543 HeapTuple tup;
1544 Form_pg_publication pubform;
1545
1546 rel = table_open(PublicationRelationId, RowExclusiveLock);
1547
1548 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1549 CStringGetDatum(stmt->pubname));
1550
1551 if (!HeapTupleIsValid(tup))
1552 ereport(ERROR,
1553 (errcode(ERRCODE_UNDEFINED_OBJECT),
1554 errmsg("publication \"%s\" does not exist",
1555 stmt->pubname)));
1556
1557 pubform = (Form_pg_publication) GETSTRUCT(tup);
1558
1559 /* must be owner */
1560 if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1562 stmt->pubname);
1563
1564 if (stmt->options)
1565 AlterPublicationOptions(pstate, stmt, rel, tup);
1566 else
1567 {
1568 List *relations = NIL;
1569 List *schemaidlist = NIL;
1570 Oid pubid = pubform->oid;
1571
1572 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1573 &schemaidlist);
1574
1575 CheckAlterPublication(stmt, tup, relations, schemaidlist);
1576
1577 heap_freetuple(tup);
1578
1579 /* Lock the publication so nobody else can do anything with it. */
1580 LockDatabaseObject(PublicationRelationId, pubid, 0,
1582
1583 /*
1584 * It is possible that by the time we acquire the lock on publication,
1585 * concurrent DDL has removed it. We can test this by checking the
1586 * existence of publication. We get the tuple again to avoid the risk
1587 * of any publication option getting changed.
1588 */
1589 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1590 if (!HeapTupleIsValid(tup))
1591 ereport(ERROR,
1592 errcode(ERRCODE_UNDEFINED_OBJECT),
1593 errmsg("publication \"%s\" does not exist",
1594 stmt->pubname));
1595
1596 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1597 schemaidlist != NIL);
1598 AlterPublicationSchemas(stmt, tup, schemaidlist);
1599 }
1600
1601 /* Cleanup. */
1602 heap_freetuple(tup);
1604}
1605
1606/*
1607 * Remove relation from publication by mapping OID.
1608 */
1609void
1611{
1612 Relation rel;
1613 HeapTuple tup;
1615 List *relids = NIL;
1616
1617 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1618
1619 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1620
1621 if (!HeapTupleIsValid(tup))
1622 elog(ERROR, "cache lookup failed for publication table %u",
1623 proid);
1624
1625 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1626
1627 /*
1628 * Invalidate relcache so that publication info is rebuilt.
1629 *
1630 * For the partitioned tables, we must invalidate all partitions contained
1631 * in the respective partition hierarchies, not just the one explicitly
1632 * mentioned in the publication. This is required because we implicitly
1633 * publish the child tables when the parent table is published.
1634 */
1636 pubrel->prrelid);
1637
1639
1640 CatalogTupleDelete(rel, &tup->t_self);
1641
1642 ReleaseSysCache(tup);
1643
1645}
1646
1647/*
1648 * Remove the publication by mapping OID.
1649 */
1650void
1652{
1653 Relation rel;
1654 HeapTuple tup;
1655 Form_pg_publication pubform;
1656
1657 rel = table_open(PublicationRelationId, RowExclusiveLock);
1658
1659 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1660 if (!HeapTupleIsValid(tup))
1661 elog(ERROR, "cache lookup failed for publication %u", pubid);
1662
1663 pubform = (Form_pg_publication) GETSTRUCT(tup);
1664
1665 /* Invalidate relcache so that publication info is rebuilt. */
1666 if (pubform->puballtables)
1668
1669 CatalogTupleDelete(rel, &tup->t_self);
1670
1671 ReleaseSysCache(tup);
1672
1674}
1675
1676/*
1677 * Remove schema from publication by mapping OID.
1678 */
1679void
1681{
1682 Relation rel;
1683 HeapTuple tup;
1684 List *schemaRels = NIL;
1686
1687 rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1688
1689 tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1690
1691 if (!HeapTupleIsValid(tup))
1692 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1693
1695
1696 /*
1697 * Invalidate relcache so that publication info is rebuilt. See
1698 * RemovePublicationRelById for why we need to consider all the
1699 * partitions.
1700 */
1701 schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1703 InvalidatePublicationRels(schemaRels);
1704
1705 CatalogTupleDelete(rel, &tup->t_self);
1706
1707 ReleaseSysCache(tup);
1708
1710}
1711
1712/*
1713 * Open relations specified by a PublicationTable list.
1714 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1715 * add them to a publication.
1716 */
1717static List *
1719{
1720 List *relids = NIL;
1721 List *rels = NIL;
1722 ListCell *lc;
1723 List *relids_with_rf = NIL;
1724 List *relids_with_collist = NIL;
1725
1726 /*
1727 * Open, share-lock, and check all the explicitly-specified relations
1728 */
1729 foreach(lc, tables)
1730 {
1732 bool recurse = t->relation->inh;
1733 Relation rel;
1734 Oid myrelid;
1735 PublicationRelInfo *pub_rel;
1736
1737 /* Allow query cancel in case this takes a long time */
1739
1741 myrelid = RelationGetRelid(rel);
1742
1743 /*
1744 * Filter out duplicates if user specifies "foo, foo".
1745 *
1746 * Note that this algorithm is known to not be very efficient (O(N^2))
1747 * but given that it only works on list of tables given to us by user
1748 * it's deemed acceptable.
1749 */
1750 if (list_member_oid(relids, myrelid))
1751 {
1752 /* Disallow duplicate tables if there are any with row filters. */
1753 if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1754 ereport(ERROR,
1756 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1758
1759 /* Disallow duplicate tables if there are any with column lists. */
1760 if (t->columns || list_member_oid(relids_with_collist, myrelid))
1761 ereport(ERROR,
1763 errmsg("conflicting or redundant column lists for table \"%s\"",
1765
1767 continue;
1768 }
1769
1771 pub_rel->relation = rel;
1772 pub_rel->whereClause = t->whereClause;
1773 pub_rel->columns = t->columns;
1774 rels = lappend(rels, pub_rel);
1775 relids = lappend_oid(relids, myrelid);
1776
1777 if (t->whereClause)
1778 relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1779
1780 if (t->columns)
1781 relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1782
1783 /*
1784 * Add children of this rel, if requested, so that they too are added
1785 * to the publication. A partitioned table can't have any inheritance
1786 * children other than its partitions, which need not be explicitly
1787 * added to the publication.
1788 */
1789 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1790 {
1791 List *children;
1792 ListCell *child;
1793
1795 NULL);
1796
1797 foreach(child, children)
1798 {
1799 Oid childrelid = lfirst_oid(child);
1800
1801 /* Allow query cancel in case this takes a long time */
1803
1804 /*
1805 * Skip duplicates if user specified both parent and child
1806 * tables.
1807 */
1808 if (list_member_oid(relids, childrelid))
1809 {
1810 /*
1811 * We don't allow to specify row filter for both parent
1812 * and child table at the same time as it is not very
1813 * clear which one should be given preference.
1814 */
1815 if (childrelid != myrelid &&
1816 (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1817 ereport(ERROR,
1819 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1821
1822 /*
1823 * We don't allow to specify column list for both parent
1824 * and child table at the same time as it is not very
1825 * clear which one should be given preference.
1826 */
1827 if (childrelid != myrelid &&
1828 (t->columns || list_member_oid(relids_with_collist, childrelid)))
1829 ereport(ERROR,
1831 errmsg("conflicting or redundant column lists for table \"%s\"",
1833
1834 continue;
1835 }
1836
1837 /* find_all_inheritors already got lock */
1838 rel = table_open(childrelid, NoLock);
1840 pub_rel->relation = rel;
1841 /* child inherits WHERE clause from parent */
1842 pub_rel->whereClause = t->whereClause;
1843
1844 /* child inherits column list from parent */
1845 pub_rel->columns = t->columns;
1846 rels = lappend(rels, pub_rel);
1847 relids = lappend_oid(relids, childrelid);
1848
1849 if (t->whereClause)
1850 relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1851
1852 if (t->columns)
1853 relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1854 }
1855 }
1856 }
1857
1858 list_free(relids);
1859 list_free(relids_with_rf);
1860
1861 return rels;
1862}
1863
1864/*
1865 * Close all relations in the list.
1866 */
1867static void
1869{
1870 ListCell *lc;
1871
1872 foreach(lc, rels)
1873 {
1874 PublicationRelInfo *pub_rel;
1875
1876 pub_rel = (PublicationRelInfo *) lfirst(lc);
1877 table_close(pub_rel->relation, NoLock);
1878 }
1879
1880 list_free_deep(rels);
1881}
1882
1883/*
1884 * Lock the schemas specified in the schema list in AccessShareLock mode in
1885 * order to prevent concurrent schema deletion.
1886 */
1887static void
1889{
1890 ListCell *lc;
1891
1892 foreach(lc, schemalist)
1893 {
1894 Oid schemaid = lfirst_oid(lc);
1895
1896 /* Allow query cancel in case this takes a long time */
1898 LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1899
1900 /*
1901 * It is possible that by the time we acquire the lock on schema,
1902 * concurrent DDL has removed it. We can test this by checking the
1903 * existence of schema.
1904 */
1905 if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1906 ereport(ERROR,
1907 errcode(ERRCODE_UNDEFINED_SCHEMA),
1908 errmsg("schema with OID %u does not exist", schemaid));
1909 }
1910}
1911
1912/*
1913 * Add listed tables to the publication.
1914 */
1915static void
1916PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1918{
1919 ListCell *lc;
1920
1921 foreach(lc, rels)
1922 {
1923 PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1924 Relation rel = pub_rel->relation;
1925 ObjectAddress obj;
1926
1927 /* Must be owner of the table or superuser. */
1928 if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1931
1932 obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1933 if (stmt)
1934 {
1936 (Node *) stmt);
1937
1938 InvokeObjectPostCreateHook(PublicationRelRelationId,
1939 obj.objectId, 0);
1940 }
1941 }
1942}
1943
1944/*
1945 * Remove listed tables from the publication.
1946 */
1947static void
1948PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1949{
1950 ObjectAddress obj;
1951 ListCell *lc;
1952 Oid prid;
1953
1954 foreach(lc, rels)
1955 {
1957 Relation rel = pubrel->relation;
1958 Oid relid = RelationGetRelid(rel);
1959
1960 if (pubrel->columns)
1961 ereport(ERROR,
1962 errcode(ERRCODE_SYNTAX_ERROR),
1963 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1964
1965 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1966 ObjectIdGetDatum(relid),
1967 ObjectIdGetDatum(pubid));
1968 if (!OidIsValid(prid))
1969 {
1970 if (missing_ok)
1971 continue;
1972
1973 ereport(ERROR,
1974 (errcode(ERRCODE_UNDEFINED_OBJECT),
1975 errmsg("relation \"%s\" is not part of the publication",
1977 }
1978
1979 if (pubrel->whereClause)
1980 ereport(ERROR,
1981 (errcode(ERRCODE_SYNTAX_ERROR),
1982 errmsg("cannot use a WHERE clause when removing a table from a publication")));
1983
1984 ObjectAddressSet(obj, PublicationRelRelationId, prid);
1985 performDeletion(&obj, DROP_CASCADE, 0);
1986 }
1987}
1988
1989/*
1990 * Add listed schemas to the publication.
1991 */
1992static void
1993PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1995{
1996 ListCell *lc;
1997
1998 foreach(lc, schemas)
1999 {
2000 Oid schemaid = lfirst_oid(lc);
2001 ObjectAddress obj;
2002
2003 obj = publication_add_schema(pubid, schemaid, if_not_exists);
2004 if (stmt)
2005 {
2007 (Node *) stmt);
2008
2009 InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2010 obj.objectId, 0);
2011 }
2012 }
2013}
2014
2015/*
2016 * Remove listed schemas from the publication.
2017 */
2018static void
2019PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2020{
2021 ObjectAddress obj;
2022 ListCell *lc;
2023 Oid psid;
2024
2025 foreach(lc, schemas)
2026 {
2027 Oid schemaid = lfirst_oid(lc);
2028
2029 psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2030 Anum_pg_publication_namespace_oid,
2031 ObjectIdGetDatum(schemaid),
2032 ObjectIdGetDatum(pubid));
2033 if (!OidIsValid(psid))
2034 {
2035 if (missing_ok)
2036 continue;
2037
2038 ereport(ERROR,
2039 (errcode(ERRCODE_UNDEFINED_OBJECT),
2040 errmsg("tables from schema \"%s\" are not part of the publication",
2041 get_namespace_name(schemaid))));
2042 }
2043
2044 ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2045 performDeletion(&obj, DROP_CASCADE, 0);
2046 }
2047}
2048
2049/*
2050 * Internal workhorse for changing a publication owner
2051 */
2052static void
2054{
2056
2057 form = (Form_pg_publication) GETSTRUCT(tup);
2058
2059 if (form->pubowner == newOwnerId)
2060 return;
2061
2062 if (!superuser())
2063 {
2064 AclResult aclresult;
2065
2066 /* Must be owner */
2067 if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2069 NameStr(form->pubname));
2070
2071 /* Must be able to become new owner */
2072 check_can_set_role(GetUserId(), newOwnerId);
2073
2074 /* New owner must have CREATE privilege on database */
2075 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
2076 if (aclresult != ACLCHECK_OK)
2079
2080 if (!superuser_arg(newOwnerId))
2081 {
2082 if (form->puballtables || form->puballsequences ||
2083 is_schema_publication(form->oid))
2084 ereport(ERROR,
2085 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2086 errmsg("permission denied to change owner of publication \"%s\"",
2087 NameStr(form->pubname)),
2088 errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2089 }
2090 }
2091
2092 form->pubowner = newOwnerId;
2093 CatalogTupleUpdate(rel, &tup->t_self, tup);
2094
2095 /* Update owner dependency reference */
2096 changeDependencyOnOwner(PublicationRelationId,
2097 form->oid,
2098 newOwnerId);
2099
2100 InvokeObjectPostAlterHook(PublicationRelationId,
2101 form->oid, 0);
2102}
2103
2104/*
2105 * Change publication owner -- by name
2106 */
2108AlterPublicationOwner(const char *name, Oid newOwnerId)
2109{
2110 Oid pubid;
2111 HeapTuple tup;
2112 Relation rel;
2113 ObjectAddress address;
2114 Form_pg_publication pubform;
2115
2116 rel = table_open(PublicationRelationId, RowExclusiveLock);
2117
2118 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2119
2120 if (!HeapTupleIsValid(tup))
2121 ereport(ERROR,
2122 (errcode(ERRCODE_UNDEFINED_OBJECT),
2123 errmsg("publication \"%s\" does not exist", name)));
2124
2125 pubform = (Form_pg_publication) GETSTRUCT(tup);
2126 pubid = pubform->oid;
2127
2128 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2129
2130 ObjectAddressSet(address, PublicationRelationId, pubid);
2131
2132 heap_freetuple(tup);
2133
2135
2136 return address;
2137}
2138
2139/*
2140 * Change publication owner -- by OID
2141 */
2142void
2144{
2145 HeapTuple tup;
2146 Relation rel;
2147
2148 rel = table_open(PublicationRelationId, RowExclusiveLock);
2149
2150 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2151
2152 if (!HeapTupleIsValid(tup))
2153 ereport(ERROR,
2154 (errcode(ERRCODE_UNDEFINED_OBJECT),
2155 errmsg("publication with OID %u does not exist", pubid)));
2156
2157 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2158
2159 heap_freetuple(tup);
2160
2162}
2163
2164/*
2165 * Extract the publish_generated_columns option value from a DefElem. "stored"
2166 * and "none" values are accepted.
2167 */
2168static char
2170{
2171 char *sval = "";
2172
2173 /*
2174 * A parameter value is required.
2175 */
2176 if (def->arg)
2177 {
2178 sval = defGetString(def);
2179
2180 if (pg_strcasecmp(sval, "none") == 0)
2181 return PUBLISH_GENCOLS_NONE;
2182 if (pg_strcasecmp(sval, "stored") == 0)
2183 return PUBLISH_GENCOLS_STORED;
2184 }
2185
2186 ereport(ERROR,
2187 errcode(ERRCODE_SYNTAX_ERROR),
2188 errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2189 errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2190
2191 return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2192}
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5341
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2654
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3836
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4090
int16 AttrNumber
Definition: attnum.h:21
#define InvalidAttrNumber
Definition: attnum.h:23
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1305
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
static Datum values[MAXATTR]
Definition: bootstrap.c:155
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:771
#define OidIsValid(objectId)
Definition: c.h:794
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:448
char * defGetString(DefElem *def)
Definition: define.c:35
bool defGetBoolean(DefElem *def)
Definition: define.c:94
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:371
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:274
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define NOTICE
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:150
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define palloc_object(type)
Definition: fe_memutils.h:74
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:684
Oid MyDatabaseId
Definition: globals.c:94
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition: heaptuple.c:456
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition: indexing.c:365
void CacheInvalidateRelSyncAll(void)
Definition: inval.c:1724
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1691
void CacheInvalidateRelSync(Oid relid)
Definition: inval.c:1712
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1658
int x
Definition: isn.c:75
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:81
List * list_concat_unique_oid(List *list1, const List *list2)
Definition: list.c:1469
List * lappend(List *list, void *datum)
Definition: list.c:339
List * list_difference_oid(const List *list1, const List *list2)
Definition: list.c:1313
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
void list_free_deep(List *list)
Definition: list.c:1560
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1008
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_rel_name(Oid relid)
Definition: lsyscache.c:2078
AttrNumber get_attnum(Oid relid, const char *attname)
Definition: lsyscache.c:934
char * get_database_name(Oid dbid)
Definition: lsyscache.c:1242
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2153
char func_volatile(Oid funcid)
Definition: lsyscache.c:1930
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition: lsyscache.c:903
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3516
char * pstrdup(const char *in)
Definition: mcxt.c:1781
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
Oid GetUserId(void)
Definition: miscinit.c:469
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
List * fetch_search_path(bool includeImplicit)
Definition: namespace.c:4889
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Definition: namespace.c:3605
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:42
Oid exprInputCollation(const Node *expr)
Definition: nodeFuncs.c:1076
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Definition: nodeFuncs.c:1909
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:821
int exprLocation(const Node *expr)
Definition: nodeFuncs.c:1384
#define expression_tree_walker(n, w, c)
Definition: nodeFuncs.h:153
#define IsA(nodeptr, _type_)
Definition: nodes.h:164
#define copyObject(obj)
Definition: nodes.h:232
#define nodeTag(nodeptr)
Definition: nodes.h:139
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
Definition: parse_node.c:72
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:106
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
@ EXPR_KIND_WHERE
Definition: parse_node.h:46
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
Definition: parsenodes.h:4312
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
Definition: parsenodes.h:4311
@ PUBLICATIONOBJ_TABLE
Definition: parsenodes.h:4310
@ AP_DropObjects
Definition: parsenodes.h:4356
@ AP_SetObjects
Definition: parsenodes.h:4357
@ AP_AddObjects
Definition: parsenodes.h:4355
@ DROP_CASCADE
Definition: parsenodes.h:2426
@ OBJECT_DATABASE
Definition: parsenodes.h:2361
@ OBJECT_PUBLICATION
Definition: parsenodes.h:2382
#define ACL_CREATE
Definition: parsenodes.h:85
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
NameData relname
Definition: pg_class.h:38
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_node(type, lc)
Definition: pg_list.h:176
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_oid(var, lst)
Definition: pg_list.h:471
#define linitial_oid(l)
Definition: pg_list.h:180
#define lfirst_oid(lc)
Definition: pg_list.h:174
Bitmapset * pub_collist_validate(Relation targetrel, List *columns)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
bool is_schema_publication(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Publication * GetPublication(Oid pubid)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:316
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:32
static Datum BoolGetDatum(bool X)
Definition: postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:380
static Datum CharGetDatum(char X)
Definition: postgres.h:132
unsigned int Oid
Definition: postgres_ext.h:32
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
void InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
static char defGetGeneratedColsOption(DefElem *def)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root, bool *publish_generated_columns_given, char *publish_generated_columns)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:515
#define RelationGetDescr(relation)
Definition: rel.h:541
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:5298
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition: relcache.h:71
Node * expand_generated_columns_in_expr(Node *node, Relation rel, int rt_index)
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
char * defname
Definition: parsenodes.h:844
Node * arg
Definition: parsenodes.h:845
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
Definition: nodes.h:135
const char * p_sourcetext
Definition: parse_node.h:195
PublicationObjSpecType pubobjtype
Definition: parsenodes.h:4320
PublicationTable * pubtable
Definition: parsenodes.h:4322
RangeVar * relation
Definition: parsenodes.h:4300
bool inh
Definition: primnodes.h:86
TupleDesc rd_att
Definition: rel.h:112
Form_pg_class rd_rel
Definition: rel.h:111
bool has_generated_virtual
Definition: tupdesc.h:47
bool has_generated_stored
Definition: tupdesc.h:46
TupleConstr * constr
Definition: tupdesc.h:141
Definition: primnodes.h:262
AttrNumber varattno
Definition: primnodes.h:274
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
bool superuser(void)
Definition: superuser.c:46
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:595
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:230
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
#define SearchSysCacheExists1(cacheId, key1)
Definition: syscache.h:100
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:109
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:83
#define FirstNormalObjectId
Definition: transam.h:197
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:2728
const char * name
void CommandCounterIncrement(void)
Definition: xact.c:1101
int wal_level
Definition: xlog.c:134
@ WAL_LEVEL_REPLICA
Definition: xlog.h:76