PostgreSQL Source Code  git master
publicationcmds.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * publicationcmds.c
4  * publication manipulation
5  *
6  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/commands/publicationcmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_proc.h"
30 #include "catalog/pg_publication.h"
33 #include "catalog/pg_type.h"
34 #include "commands/dbcommands.h"
35 #include "commands/defrem.h"
36 #include "commands/event_trigger.h"
38 #include "funcapi.h"
39 #include "miscadmin.h"
40 #include "nodes/nodeFuncs.h"
41 #include "parser/parse_clause.h"
42 #include "parser/parse_collate.h"
43 #include "parser/parse_relation.h"
44 #include "storage/lmgr.h"
45 #include "utils/acl.h"
46 #include "utils/array.h"
47 #include "utils/builtins.h"
48 #include "utils/catcache.h"
49 #include "utils/fmgroids.h"
50 #include "utils/inval.h"
51 #include "utils/lsyscache.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54 #include "utils/varlena.h"
55 
56 /*
57  * Information used to validate the columns in the row filter expression. See
58  * contain_invalid_rfcolumn_walker for details.
59  */
60 typedef struct rf_context
61 {
62  Bitmapset *bms_replident; /* bitset of replica identity columns */
63  bool pubviaroot; /* true if we are validating the parent
64  * relation's row filter */
65  Oid relid; /* relid of the relation */
66  Oid parentid; /* relid of the parent relation */
68 
69 static List *OpenRelIdList(List *relids);
70 static List *OpenTableList(List *tables);
71 static void CloseTableList(List *rels);
72 static void LockSchemaList(List *schemalist);
73 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
74  AlterPublicationStmt *stmt);
75 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
76 static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
77  AlterPublicationStmt *stmt);
78 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
79 
80 static void
82  List *options,
83  bool *publish_given,
84  PublicationActions *pubactions,
85  bool *publish_via_partition_root_given,
86  bool *publish_via_partition_root)
87 {
88  ListCell *lc;
89 
90  *publish_given = false;
91  *publish_via_partition_root_given = false;
92 
93  /* defaults */
94  pubactions->pubinsert = true;
95  pubactions->pubupdate = true;
96  pubactions->pubdelete = true;
97  pubactions->pubtruncate = true;
98  *publish_via_partition_root = false;
99 
100  /* Parse options */
101  foreach(lc, options)
102  {
103  DefElem *defel = (DefElem *) lfirst(lc);
104 
105  if (strcmp(defel->defname, "publish") == 0)
106  {
107  char *publish;
108  List *publish_list;
109  ListCell *lc;
110 
111  if (*publish_given)
112  errorConflictingDefElem(defel, pstate);
113 
114  /*
115  * If publish option was given only the explicitly listed actions
116  * should be published.
117  */
118  pubactions->pubinsert = false;
119  pubactions->pubupdate = false;
120  pubactions->pubdelete = false;
121  pubactions->pubtruncate = false;
122 
123  *publish_given = true;
124  publish = defGetString(defel);
125 
126  if (!SplitIdentifierString(publish, ',', &publish_list))
127  ereport(ERROR,
128  (errcode(ERRCODE_SYNTAX_ERROR),
129  errmsg("invalid list syntax for \"publish\" option")));
130 
131  /* Process the option list. */
132  foreach(lc, publish_list)
133  {
134  char *publish_opt = (char *) lfirst(lc);
135 
136  if (strcmp(publish_opt, "insert") == 0)
137  pubactions->pubinsert = true;
138  else if (strcmp(publish_opt, "update") == 0)
139  pubactions->pubupdate = true;
140  else if (strcmp(publish_opt, "delete") == 0)
141  pubactions->pubdelete = true;
142  else if (strcmp(publish_opt, "truncate") == 0)
143  pubactions->pubtruncate = true;
144  else
145  ereport(ERROR,
146  (errcode(ERRCODE_SYNTAX_ERROR),
147  errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
148  }
149  }
150  else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
151  {
152  if (*publish_via_partition_root_given)
153  errorConflictingDefElem(defel, pstate);
154  *publish_via_partition_root_given = true;
155  *publish_via_partition_root = defGetBoolean(defel);
156  }
157  else
158  ereport(ERROR,
159  (errcode(ERRCODE_SYNTAX_ERROR),
160  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
161  }
162 }
163 
164 /*
165  * Convert the PublicationObjSpecType list into schema oid list and
166  * PublicationTable list.
167  */
168 static void
169 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
170  List **rels, List **schemas)
171 {
172  ListCell *cell;
173  PublicationObjSpec *pubobj;
174 
175  if (!pubobjspec_list)
176  return;
177 
178  foreach(cell, pubobjspec_list)
179  {
180  Oid schemaid;
181  List *search_path;
182 
183  pubobj = (PublicationObjSpec *) lfirst(cell);
184 
185  switch (pubobj->pubobjtype)
186  {
188  *rels = lappend(*rels, pubobj->pubtable);
189  break;
191  schemaid = get_namespace_oid(pubobj->name, false);
192 
193  /* Filter out duplicates if user specifies "sch1, sch1" */
194  *schemas = list_append_unique_oid(*schemas, schemaid);
195  break;
197  search_path = fetch_search_path(false);
198  if (search_path == NIL) /* nothing valid in search_path? */
199  ereport(ERROR,
200  errcode(ERRCODE_UNDEFINED_SCHEMA),
201  errmsg("no schema has been selected for CURRENT_SCHEMA"));
202 
203  schemaid = linitial_oid(search_path);
204  list_free(search_path);
205 
206  /* Filter out duplicates if user specifies "sch1, sch1" */
207  *schemas = list_append_unique_oid(*schemas, schemaid);
208  break;
209  default:
210  /* shouldn't happen */
211  elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
212  break;
213  }
214  }
215 }
216 
217 /*
218  * Check if any of the given relation's schema is a member of the given schema
219  * list.
220  */
221 static void
223  PublicationObjSpecType checkobjtype)
224 {
225  ListCell *lc;
226 
227  foreach(lc, rels)
228  {
229  PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
230  Relation rel = pub_rel->relation;
231  Oid relSchemaId = RelationGetNamespace(rel);
232 
233  if (list_member_oid(schemaidlist, relSchemaId))
234  {
235  if (checkobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA)
236  ereport(ERROR,
237  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
238  errmsg("cannot add schema \"%s\" to publication",
239  get_namespace_name(relSchemaId)),
240  errdetail("Table \"%s\" in schema \"%s\" is already part of the publication, adding the same schema is not supported.",
242  get_namespace_name(relSchemaId)));
243  else if (checkobjtype == PUBLICATIONOBJ_TABLE)
244  ereport(ERROR,
245  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
246  errmsg("cannot add relation \"%s.%s\" to publication",
247  get_namespace_name(relSchemaId),
249  errdetail("Table's schema \"%s\" is already part of the publication or part of the specified schema list.",
250  get_namespace_name(relSchemaId)));
251  }
252  }
253 }
254 
255 /*
256  * Returns true if any of the columns used in the row filter WHERE expression is
257  * not part of REPLICA IDENTITY, false otherwise.
258  */
259 static bool
261 {
262  if (node == NULL)
263  return false;
264 
265  if (IsA(node, Var))
266  {
267  Var *var = (Var *) node;
268  AttrNumber attnum = var->varattno;
269 
270  /*
271  * If pubviaroot is true, we are validating the row filter of the
272  * parent table, but the bitmap contains the replica identity
273  * information of the child table. So, get the column number of the
274  * child table as parent and child column order could be different.
275  */
276  if (context->pubviaroot)
277  {
278  char *colname = get_attname(context->parentid, attnum, false);
279 
280  attnum = get_attnum(context->relid, colname);
281  }
282 
284  context->bms_replident))
285  return true;
286  }
287 
289  (void *) context);
290 }
291 
292 /*
293  * Check if all columns referenced in the filter expression are part of the
294  * REPLICA IDENTITY index or not.
295  *
296  * Returns true if any invalid column is found.
297  */
298 bool
299 pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
300  bool pubviaroot)
301 {
302  HeapTuple rftuple;
303  Oid relid = RelationGetRelid(relation);
304  Oid publish_as_relid = RelationGetRelid(relation);
305  bool result = false;
306  Datum rfdatum;
307  bool rfisnull;
308 
309  /*
310  * FULL means all columns are in the REPLICA IDENTITY, so all columns are
311  * allowed in the row filter and we can skip the validation.
312  */
313  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
314  return false;
315 
316  /*
317  * For a partition, if pubviaroot is true, find the topmost ancestor that
318  * is published via this publication as we need to use its row filter
319  * expression to filter the partition's changes.
320  *
321  * Note that even though the row filter used is for an ancestor, the
322  * REPLICA IDENTITY used will be for the actual child table.
323  */
324  if (pubviaroot && relation->rd_rel->relispartition)
325  {
326  publish_as_relid
327  = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
328 
329  if (!OidIsValid(publish_as_relid))
330  publish_as_relid = relid;
331  }
332 
334  ObjectIdGetDatum(publish_as_relid),
335  ObjectIdGetDatum(pubid));
336 
337  if (!HeapTupleIsValid(rftuple))
338  return false;
339 
340  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
341  Anum_pg_publication_rel_prqual,
342  &rfisnull);
343 
344  if (!rfisnull)
345  {
346  rf_context context = {0};
347  Node *rfnode;
348  Bitmapset *bms = NULL;
349 
350  context.pubviaroot = pubviaroot;
351  context.parentid = publish_as_relid;
352  context.relid = relid;
353 
354  /* Remember columns that are part of the REPLICA IDENTITY */
355  bms = RelationGetIndexAttrBitmap(relation,
357 
358  context.bms_replident = bms;
359  rfnode = stringToNode(TextDatumGetCString(rfdatum));
360  result = contain_invalid_rfcolumn_walker(rfnode, &context);
361  }
362 
363  ReleaseSysCache(rftuple);
364 
365  return result;
366 }
367 
368 /*
369  * Check if all columns referenced in the REPLICA IDENTITY are covered by
370  * the column list.
371  *
372  * Returns true if any replica identity column is not covered by column list.
373  */
374 bool
376  bool pubviaroot)
377 {
378  HeapTuple tuple;
379  Oid relid = RelationGetRelid(relation);
380  Oid publish_as_relid = RelationGetRelid(relation);
381  bool result = false;
382  Datum datum;
383  bool isnull;
384 
385  /*
386  * For a partition, if pubviaroot is true, find the topmost ancestor that
387  * is published via this publication as we need to use its column list for
388  * the changes.
389  *
390  * Note that even though the column list used is for an ancestor, the
391  * REPLICA IDENTITY used will be for the actual child table.
392  */
393  if (pubviaroot && relation->rd_rel->relispartition)
394  {
395  publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
396 
397  if (!OidIsValid(publish_as_relid))
398  publish_as_relid = relid;
399  }
400 
402  ObjectIdGetDatum(publish_as_relid),
403  ObjectIdGetDatum(pubid));
404 
405  if (!HeapTupleIsValid(tuple))
406  return false;
407 
408  datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
409  Anum_pg_publication_rel_prattrs,
410  &isnull);
411 
412  if (!isnull)
413  {
414  int x;
415  Bitmapset *idattrs;
416  Bitmapset *columns = NULL;
417 
418  /* With REPLICA IDENTITY FULL, no column list is allowed. */
419  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
420  result = true;
421 
422  /* Transform the column list datum to a bitmapset. */
423  columns = pub_collist_to_bitmapset(NULL, datum, NULL);
424 
425  /* Remember columns that are part of the REPLICA IDENTITY */
426  idattrs = RelationGetIndexAttrBitmap(relation,
428 
429  /*
430  * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
431  * offset (to handle system columns the usual way), while column list
432  * does not use offset, so we can't do bms_is_subset(). Instead, we
433  * have to loop over the idattrs and check all of them are in the
434  * list.
435  */
436  x = -1;
437  while ((x = bms_next_member(idattrs, x)) >= 0)
438  {
440 
441  /*
442  * If pubviaroot is true, we are validating the column list of the
443  * parent table, but the bitmap contains the replica identity
444  * information of the child table. The parent/child attnums may
445  * not match, so translate them to the parent - get the attname
446  * from the child, and look it up in the parent.
447  */
448  if (pubviaroot)
449  {
450  /* attribute name in the child table */
451  char *colname = get_attname(relid, attnum, false);
452 
453  /*
454  * Determine the attnum for the attribute name in parent (we
455  * are using the column list defined on the parent).
456  */
457  attnum = get_attnum(publish_as_relid, colname);
458  }
459 
460  /* replica identity column, not covered by the column list */
461  if (!bms_is_member(attnum, columns))
462  {
463  result = true;
464  break;
465  }
466  }
467 
468  bms_free(idattrs);
469  bms_free(columns);
470  }
471 
472  ReleaseSysCache(tuple);
473 
474  return result;
475 }
476 
477 /* check_functions_in_node callback */
478 static bool
480 {
481  return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
482  func_id >= FirstNormalObjectId);
483 }
484 
485 /*
486  * Check if the node contains any unallowed object. See
487  * check_simple_rowfilter_expr_walker.
488  *
489  * Returns the error detail message in errdetail_msg for unallowed expressions.
490  */
491 static void
492 expr_allowed_in_node(Node *node, ParseState *pstate, char **errdetail_msg)
493 {
494  if (IsA(node, List))
495  {
496  /*
497  * OK, we don't need to perform other expr checks for List nodes
498  * because those are undefined for List.
499  */
500  return;
501  }
502 
503  if (exprType(node) >= FirstNormalObjectId)
504  *errdetail_msg = _("User-defined types are not allowed.");
506  (void *) pstate))
507  *errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
508  else if (exprCollation(node) >= FirstNormalObjectId ||
510  *errdetail_msg = _("User-defined collations are not allowed.");
511 }
512 
513 /*
514  * The row filter walker checks if the row filter expression is a "simple
515  * expression".
516  *
517  * It allows only simple or compound expressions such as:
518  * - (Var Op Const)
519  * - (Var Op Var)
520  * - (Var Op Const) AND/OR (Var Op Const)
521  * - etc
522  * (where Var is a column of the table this filter belongs to)
523  *
524  * The simple expression has the following restrictions:
525  * - User-defined operators are not allowed;
526  * - User-defined functions are not allowed;
527  * - User-defined types are not allowed;
528  * - User-defined collations are not allowed;
529  * - Non-immutable built-in functions are not allowed;
530  * - System columns are not allowed.
531  *
532  * NOTES
533  *
534  * We don't allow user-defined functions/operators/types/collations because
535  * (a) if a user drops a user-defined object used in a row filter expression or
536  * if there is any other error while using it, the logical decoding
537  * infrastructure won't be able to recover from such an error even if the
538  * object is recreated again because a historic snapshot is used to evaluate
539  * the row filter;
540  * (b) a user-defined function can be used to access tables that could have
541  * unpleasant results because a historic snapshot is used. That's why only
542  * immutable built-in functions are allowed in row filter expressions.
543  *
544  * We don't allow system columns because currently, we don't have that
545  * information in the tuple passed to downstream. Also, as we don't replicate
546  * those to subscribers, there doesn't seem to be a need for a filter on those
547  * columns.
548  *
549  * We can allow other node types after more analysis and testing.
550  */
551 static bool
553 {
554  char *errdetail_msg = NULL;
555 
556  if (node == NULL)
557  return false;
558 
559  switch (nodeTag(node))
560  {
561  case T_Var:
562  /* System columns are not allowed. */
563  if (((Var *) node)->varattno < InvalidAttrNumber)
564  errdetail_msg = _("System columns are not allowed.");
565  break;
566  case T_OpExpr:
567  case T_DistinctExpr:
568  case T_NullIfExpr:
569  /* OK, except user-defined operators are not allowed. */
570  if (((OpExpr *) node)->opno >= FirstNormalObjectId)
571  errdetail_msg = _("User-defined operators are not allowed.");
572  break;
573  case T_ScalarArrayOpExpr:
574  /* OK, except user-defined operators are not allowed. */
575  if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
576  errdetail_msg = _("User-defined operators are not allowed.");
577 
578  /*
579  * We don't need to check the hashfuncid and negfuncid of
580  * ScalarArrayOpExpr as those functions are only built for a
581  * subquery.
582  */
583  break;
584  case T_RowCompareExpr:
585  {
586  ListCell *opid;
587 
588  /* OK, except user-defined operators are not allowed. */
589  foreach(opid, ((RowCompareExpr *) node)->opnos)
590  {
591  if (lfirst_oid(opid) >= FirstNormalObjectId)
592  {
593  errdetail_msg = _("User-defined operators are not allowed.");
594  break;
595  }
596  }
597  }
598  break;
599  case T_Const:
600  case T_FuncExpr:
601  case T_BoolExpr:
602  case T_RelabelType:
603  case T_CollateExpr:
604  case T_CaseExpr:
605  case T_CaseTestExpr:
606  case T_ArrayExpr:
607  case T_RowExpr:
608  case T_CoalesceExpr:
609  case T_MinMaxExpr:
610  case T_XmlExpr:
611  case T_NullTest:
612  case T_BooleanTest:
613  case T_List:
614  /* OK, supported */
615  break;
616  default:
617  errdetail_msg = _("Expressions only allow columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions.");
618  break;
619  }
620 
621  /*
622  * For all the supported nodes, check the types, functions, and collations
623  * used in the nodes.
624  */
625  if (!errdetail_msg)
626  expr_allowed_in_node(node, pstate, &errdetail_msg);
627 
628  if (errdetail_msg)
629  ereport(ERROR,
630  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
631  errmsg("invalid publication WHERE expression"),
632  errdetail("%s", errdetail_msg),
633  parser_errposition(pstate, exprLocation(node))));
634 
636  (void *) pstate);
637 }
638 
639 /*
640  * Check if the row filter expression is a "simple expression".
641  *
642  * See check_simple_rowfilter_expr_walker for details.
643  */
644 static bool
646 {
647  return check_simple_rowfilter_expr_walker(node, pstate);
648 }
649 
650 /*
651  * Transform the publication WHERE expression for all the relations in the list,
652  * ensuring it is coerced to boolean and necessary collation information is
653  * added if required, and add a new nsitem/RTE for the associated relation to
654  * the ParseState's namespace list.
655  *
656  * Also check the publication row filter expression and throw an error if
657  * anything not permitted or unexpected is encountered.
658  */
659 static void
660 TransformPubWhereClauses(List *tables, const char *queryString,
661  bool pubviaroot)
662 {
663  ListCell *lc;
664 
665  foreach(lc, tables)
666  {
667  ParseNamespaceItem *nsitem;
668  Node *whereclause = NULL;
669  ParseState *pstate;
671 
672  if (pri->whereClause == NULL)
673  continue;
674 
675  /*
676  * If the publication doesn't publish changes via the root partitioned
677  * table, the partition's row filter will be used. So disallow using
678  * WHERE clause on partitioned table in this case.
679  */
680  if (!pubviaroot &&
681  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
682  ereport(ERROR,
683  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
684  errmsg("cannot use publication WHERE clause for relation \"%s\"",
686  errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
687  "publish_via_partition_root")));
688 
689  pstate = make_parsestate(NULL);
690  pstate->p_sourcetext = queryString;
691 
692  nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
693  AccessShareLock, NULL,
694  false, false);
695 
696  addNSItemToQuery(pstate, nsitem, false, true, true);
697 
698  whereclause = transformWhereClause(pstate,
699  copyObject(pri->whereClause),
701  "PUBLICATION WHERE");
702 
703  /* Fix up collation information */
704  assign_expr_collations(pstate, whereclause);
705 
706  /*
707  * We allow only simple expressions in row filters. See
708  * check_simple_rowfilter_expr_walker.
709  */
710  check_simple_rowfilter_expr(whereclause, pstate);
711 
712  free_parsestate(pstate);
713 
714  pri->whereClause = whereclause;
715  }
716 }
717 
718 
719 /*
720  * Check the publication column lists expression for all relations in the list.
721  */
722 static void
723 CheckPubRelationColumnList(List *tables, const char *queryString,
724  bool pubviaroot)
725 {
726  ListCell *lc;
727 
728  foreach(lc, tables)
729  {
731 
732  if (pri->columns == NIL)
733  continue;
734 
735  /*
736  * If the publication doesn't publish changes via the root partitioned
737  * table, the partition's column list will be used. So disallow using
738  * the column list on partitioned table in this case.
739  */
740  if (!pubviaroot &&
741  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
742  ereport(ERROR,
743  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
744  errmsg("cannot use publication column list for relation \"%s\"",
746  errdetail("column list cannot be used for a partitioned table when %s is false.",
747  "publish_via_partition_root")));
748  }
749 }
750 
751 /*
752  * Create new publication.
753  */
756 {
757  Relation rel;
758  ObjectAddress myself;
759  Oid puboid;
760  bool nulls[Natts_pg_publication];
761  Datum values[Natts_pg_publication];
762  HeapTuple tup;
763  bool publish_given;
764  PublicationActions pubactions;
765  bool publish_via_partition_root_given;
766  bool publish_via_partition_root;
767  AclResult aclresult;
768  List *relations = NIL;
769  List *schemaidlist = NIL;
770 
771  /* must have CREATE privilege on database */
773  if (aclresult != ACLCHECK_OK)
774  aclcheck_error(aclresult, OBJECT_DATABASE,
776 
777  /* FOR ALL TABLES requires superuser */
778  if (stmt->for_all_tables && !superuser())
779  ereport(ERROR,
780  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
781  errmsg("must be superuser to create FOR ALL TABLES publication")));
782 
783  rel = table_open(PublicationRelationId, RowExclusiveLock);
784 
785  /* Check if name is used */
786  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
787  CStringGetDatum(stmt->pubname));
788  if (OidIsValid(puboid))
789  {
790  ereport(ERROR,
792  errmsg("publication \"%s\" already exists",
793  stmt->pubname)));
794  }
795 
796  /* Form a tuple. */
797  memset(values, 0, sizeof(values));
798  memset(nulls, false, sizeof(nulls));
799 
800  values[Anum_pg_publication_pubname - 1] =
802  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
803 
805  stmt->options,
806  &publish_given, &pubactions,
807  &publish_via_partition_root_given,
808  &publish_via_partition_root);
809 
810  puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
811  Anum_pg_publication_oid);
812  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
813  values[Anum_pg_publication_puballtables - 1] =
815  values[Anum_pg_publication_pubinsert - 1] =
816  BoolGetDatum(pubactions.pubinsert);
817  values[Anum_pg_publication_pubupdate - 1] =
818  BoolGetDatum(pubactions.pubupdate);
819  values[Anum_pg_publication_pubdelete - 1] =
820  BoolGetDatum(pubactions.pubdelete);
821  values[Anum_pg_publication_pubtruncate - 1] =
822  BoolGetDatum(pubactions.pubtruncate);
823  values[Anum_pg_publication_pubviaroot - 1] =
824  BoolGetDatum(publish_via_partition_root);
825 
826  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
827 
828  /* Insert tuple into catalog. */
829  CatalogTupleInsert(rel, tup);
830  heap_freetuple(tup);
831 
832  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
833 
834  ObjectAddressSet(myself, PublicationRelationId, puboid);
835 
836  /* Make the changes visible. */
838 
839  /* Associate objects with the publication. */
840  if (stmt->for_all_tables)
841  {
842  /* Invalidate relcache so that publication info is rebuilt. */
844  }
845  else
846  {
847  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
848  &schemaidlist);
849 
850  /* FOR ALL TABLES IN SCHEMA requires superuser */
851  if (list_length(schemaidlist) > 0 && !superuser())
852  ereport(ERROR,
853  errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
854  errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
855 
856  if (list_length(relations) > 0)
857  {
858  List *rels;
859 
860  rels = OpenTableList(relations);
861  CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
863 
865  publish_via_partition_root);
866 
868  publish_via_partition_root);
869 
870  PublicationAddTables(puboid, rels, true, NULL);
871  CloseTableList(rels);
872  }
873 
874  if (list_length(schemaidlist) > 0)
875  {
876  /*
877  * Schema lock is held until the publication is created to prevent
878  * concurrent schema deletion.
879  */
880  LockSchemaList(schemaidlist);
881  PublicationAddSchemas(puboid, schemaidlist, true, NULL);
882  }
883  }
884 
886 
887  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
888 
890  {
892  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
893  errmsg("wal_level is insufficient to publish logical changes"),
894  errhint("Set wal_level to logical before creating subscriptions.")));
895  }
896 
897  return myself;
898 }
899 
900 /*
901  * Change options of a publication.
902  */
903 static void
905  Relation rel, HeapTuple tup)
906 {
907  bool nulls[Natts_pg_publication];
908  bool replaces[Natts_pg_publication];
909  Datum values[Natts_pg_publication];
910  bool publish_given;
911  PublicationActions pubactions;
912  bool publish_via_partition_root_given;
913  bool publish_via_partition_root;
914  ObjectAddress obj;
915  Form_pg_publication pubform;
916  List *root_relids = NIL;
917  ListCell *lc;
918 
920  stmt->options,
921  &publish_given, &pubactions,
922  &publish_via_partition_root_given,
923  &publish_via_partition_root);
924 
925  pubform = (Form_pg_publication) GETSTRUCT(tup);
926 
927  /*
928  * If the publication doesn't publish changes via the root partitioned
929  * table, the partition's row filter and column list will be used. So
930  * disallow using WHERE clause and column lists on partitioned table in
931  * this case.
932  */
933  if (!pubform->puballtables && publish_via_partition_root_given &&
934  !publish_via_partition_root)
935  {
936  /*
937  * Lock the publication so nobody else can do anything with it. This
938  * prevents concurrent alter to add partitioned table(s) with WHERE
939  * clause(s) and/or column lists which we don't allow when not
940  * publishing via root.
941  */
942  LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
944 
945  root_relids = GetPublicationRelations(pubform->oid,
947 
948  foreach(lc, root_relids)
949  {
950  Oid relid = lfirst_oid(lc);
951  HeapTuple rftuple;
952  char relkind;
953  char *relname;
954  bool has_rowfilter;
955  bool has_collist;
956 
957  /*
958  * Beware: we don't have lock on the relations, so cope silently
959  * with the cache lookups returning NULL.
960  */
961 
963  ObjectIdGetDatum(relid),
964  ObjectIdGetDatum(pubform->oid));
965  if (!HeapTupleIsValid(rftuple))
966  continue;
967  has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
968  has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
969  if (!has_rowfilter && !has_collist)
970  {
971  ReleaseSysCache(rftuple);
972  continue;
973  }
974 
975  relkind = get_rel_relkind(relid);
976  if (relkind != RELKIND_PARTITIONED_TABLE)
977  {
978  ReleaseSysCache(rftuple);
979  continue;
980  }
981  relname = get_rel_name(relid);
982  if (relname == NULL) /* table concurrently dropped */
983  {
984  ReleaseSysCache(rftuple);
985  continue;
986  }
987 
988  if (has_rowfilter)
989  ereport(ERROR,
990  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
991  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
992  "publish_via_partition_root",
993  stmt->pubname),
994  errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
995  relname, "publish_via_partition_root")));
996  Assert(has_collist);
997  ereport(ERROR,
998  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
999  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1000  "publish_via_partition_root",
1001  stmt->pubname),
1002  errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1003  relname, "publish_via_partition_root")));
1004  }
1005  }
1006 
1007  /* Everything ok, form a new tuple. */
1008  memset(values, 0, sizeof(values));
1009  memset(nulls, false, sizeof(nulls));
1010  memset(replaces, false, sizeof(replaces));
1011 
1012  if (publish_given)
1013  {
1014  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1015  replaces[Anum_pg_publication_pubinsert - 1] = true;
1016 
1017  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1018  replaces[Anum_pg_publication_pubupdate - 1] = true;
1019 
1020  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1021  replaces[Anum_pg_publication_pubdelete - 1] = true;
1022 
1023  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1024  replaces[Anum_pg_publication_pubtruncate - 1] = true;
1025  }
1026 
1027  if (publish_via_partition_root_given)
1028  {
1029  values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1030  replaces[Anum_pg_publication_pubviaroot - 1] = true;
1031  }
1032 
1033  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1034  replaces);
1035 
1036  /* Update the catalog. */
1037  CatalogTupleUpdate(rel, &tup->t_self, tup);
1038 
1040 
1041  pubform = (Form_pg_publication) GETSTRUCT(tup);
1042 
1043  /* Invalidate the relcache. */
1044  if (pubform->puballtables)
1045  {
1047  }
1048  else
1049  {
1050  List *relids = NIL;
1051  List *schemarelids = NIL;
1052 
1053  /*
1054  * For any partitioned tables contained in the publication, we must
1055  * invalidate all partitions contained in the respective partition
1056  * trees, not just those explicitly mentioned in the publication.
1057  */
1058  if (root_relids == NIL)
1059  relids = GetPublicationRelations(pubform->oid,
1061  else
1062  {
1063  /*
1064  * We already got tables explicitly mentioned in the publication.
1065  * Now get all partitions for the partitioned table in the list.
1066  */
1067  foreach(lc, root_relids)
1068  relids = GetPubPartitionOptionRelations(relids,
1070  lfirst_oid(lc));
1071  }
1072 
1073  schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1075  relids = list_concat_unique_oid(relids, schemarelids);
1076 
1077  InvalidatePublicationRels(relids);
1078  }
1079 
1080  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1082  (Node *) stmt);
1083 
1084  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1085 }
1086 
1087 /*
1088  * Invalidate the relations.
1089  */
1090 void
1092 {
1093  /*
1094  * We don't want to send too many individual messages, at some point it's
1095  * cheaper to just reset whole relcache.
1096  */
1097  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1098  {
1099  ListCell *lc;
1100 
1101  foreach(lc, relids)
1103  }
1104  else
1106 }
1107 
1108 /*
1109  * Add or remove table to/from publication.
1110  */
1111 static void
1113  List *tables, List *schemaidlist,
1114  const char *queryString)
1115 {
1116  List *rels = NIL;
1118  Oid pubid = pubform->oid;
1119 
1120  /*
1121  * Nothing to do if no objects, except in SET: for that it is quite
1122  * possible that user has not specified any tables in which case we need
1123  * to remove all the existing tables.
1124  */
1125  if (!tables && stmt->action != AP_SetObjects)
1126  return;
1127 
1128  rels = OpenTableList(tables);
1129 
1130  if (stmt->action == AP_AddObjects)
1131  {
1132  List *schemas = NIL;
1133 
1134  /*
1135  * Check if the relation is member of the existing schema in the
1136  * publication or member of the schema list specified.
1137  */
1138  schemas = list_concat_copy(schemaidlist, GetPublicationSchemas(pubid));
1141 
1142  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1143 
1144  CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
1145 
1146  PublicationAddTables(pubid, rels, false, stmt);
1147  }
1148  else if (stmt->action == AP_DropObjects)
1149  PublicationDropTables(pubid, rels, false);
1150  else /* AP_SetObjects */
1151  {
1152  List *oldrelids = GetPublicationRelations(pubid,
1154  List *delrels = NIL;
1155  ListCell *oldlc;
1156 
1157  CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
1159 
1160  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1161 
1162  CheckPubRelationColumnList(rels, queryString, pubform->pubviaroot);
1163 
1164  /*
1165  * To recreate the relation list for the publication, look for
1166  * existing relations that do not need to be dropped.
1167  */
1168  foreach(oldlc, oldrelids)
1169  {
1170  Oid oldrelid = lfirst_oid(oldlc);
1171  ListCell *newlc;
1172  PublicationRelInfo *oldrel;
1173  bool found = false;
1174  HeapTuple rftuple;
1175  Node *oldrelwhereclause = NULL;
1176  Bitmapset *oldcolumns = NULL;
1177 
1178  /* look up the cache for the old relmap */
1180  ObjectIdGetDatum(oldrelid),
1181  ObjectIdGetDatum(pubid));
1182 
1183  /*
1184  * See if the existing relation currently has a WHERE clause or a
1185  * column list. We need to compare those too.
1186  */
1187  if (HeapTupleIsValid(rftuple))
1188  {
1189  bool isnull = true;
1190  Datum whereClauseDatum;
1191  Datum columnListDatum;
1192 
1193  /* Load the WHERE clause for this table. */
1194  whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1195  Anum_pg_publication_rel_prqual,
1196  &isnull);
1197  if (!isnull)
1198  oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1199 
1200  /* Transform the int2vector column list to a bitmap. */
1201  columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1202  Anum_pg_publication_rel_prattrs,
1203  &isnull);
1204 
1205  if (!isnull)
1206  oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1207 
1208  ReleaseSysCache(rftuple);
1209  }
1210 
1211  foreach(newlc, rels)
1212  {
1213  PublicationRelInfo *newpubrel;
1214  Oid newrelid;
1215  Bitmapset *newcolumns = NULL;
1216 
1217  newpubrel = (PublicationRelInfo *) lfirst(newlc);
1218  newrelid = RelationGetRelid(newpubrel->relation);
1219 
1220  /*
1221  * If the new publication has column list, transform it to a
1222  * bitmap too.
1223  */
1224  if (newpubrel->columns)
1225  {
1226  ListCell *lc;
1227 
1228  foreach(lc, newpubrel->columns)
1229  {
1230  char *colname = strVal(lfirst(lc));
1231  AttrNumber attnum = get_attnum(newrelid, colname);
1232 
1233  newcolumns = bms_add_member(newcolumns, attnum);
1234  }
1235  }
1236 
1237  /*
1238  * Check if any of the new set of relations matches with the
1239  * existing relations in the publication. Additionally, if the
1240  * relation has an associated WHERE clause, check the WHERE
1241  * expressions also match. Same for the column list. Drop the
1242  * rest.
1243  */
1244  if (RelationGetRelid(newpubrel->relation) == oldrelid)
1245  {
1246  if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1247  bms_equal(oldcolumns, newcolumns))
1248  {
1249  found = true;
1250  break;
1251  }
1252  }
1253  }
1254 
1255  /*
1256  * Add the non-matched relations to a list so that they can be
1257  * dropped.
1258  */
1259  if (!found)
1260  {
1261  oldrel = palloc(sizeof(PublicationRelInfo));
1262  oldrel->whereClause = NULL;
1263  oldrel->columns = NIL;
1264  oldrel->relation = table_open(oldrelid,
1266  delrels = lappend(delrels, oldrel);
1267  }
1268  }
1269 
1270  /* And drop them. */
1271  PublicationDropTables(pubid, delrels, true);
1272 
1273  /*
1274  * Don't bother calculating the difference for adding, we'll catch and
1275  * skip existing ones when doing catalog update.
1276  */
1277  PublicationAddTables(pubid, rels, true, stmt);
1278 
1279  CloseTableList(delrels);
1280  }
1281 
1282  CloseTableList(rels);
1283 }
1284 
1285 /*
1286  * Alter the publication schemas.
1287  *
1288  * Add or remove schemas to/from publication.
1289  */
1290 static void
1292  HeapTuple tup, List *schemaidlist)
1293 {
1295 
1296  /*
1297  * Nothing to do if no objects, except in SET: for that it is quite
1298  * possible that user has not specified any schemas in which case we need
1299  * to remove all the existing schemas.
1300  */
1301  if (!schemaidlist && stmt->action != AP_SetObjects)
1302  return;
1303 
1304  /*
1305  * Schema lock is held until the publication is altered to prevent
1306  * concurrent schema deletion.
1307  */
1308  LockSchemaList(schemaidlist);
1309  if (stmt->action == AP_AddObjects)
1310  {
1311  List *rels;
1312  List *reloids;
1313 
1314  reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1315  rels = OpenRelIdList(reloids);
1316 
1317  CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
1319 
1320  CloseTableList(rels);
1321  PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1322  }
1323  else if (stmt->action == AP_DropObjects)
1324  PublicationDropSchemas(pubform->oid, schemaidlist, false);
1325  else /* AP_SetObjects */
1326  {
1327  List *oldschemaids = GetPublicationSchemas(pubform->oid);
1328  List *delschemas = NIL;
1329 
1330  /* Identify which schemas should be dropped */
1331  delschemas = list_difference_oid(oldschemaids, schemaidlist);
1332 
1333  /*
1334  * Schema lock is held until the publication is altered to prevent
1335  * concurrent schema deletion.
1336  */
1337  LockSchemaList(delschemas);
1338 
1339  /* And drop them */
1340  PublicationDropSchemas(pubform->oid, delschemas, true);
1341 
1342  /*
1343  * Don't bother calculating the difference for adding, we'll catch and
1344  * skip existing ones when doing catalog update.
1345  */
1346  PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1347  }
1348 }
1349 
1350 /*
1351  * Check if relations and schemas can be in a given publication and throw
1352  * appropriate error if not.
1353  */
1354 static void
1356  List *tables, List *schemaidlist)
1357 {
1359 
1360  if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1361  schemaidlist && !superuser())
1362  ereport(ERROR,
1363  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1364  errmsg("must be superuser to add or set schemas")));
1365 
1366  /*
1367  * Check that user is allowed to manipulate the publication tables in
1368  * schema
1369  */
1370  if (schemaidlist && pubform->puballtables)
1371  ereport(ERROR,
1372  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1373  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1374  NameStr(pubform->pubname)),
1375  errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications.")));
1376 
1377  /* Check that user is allowed to manipulate the publication tables. */
1378  if (tables && pubform->puballtables)
1379  ereport(ERROR,
1380  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1381  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1382  NameStr(pubform->pubname)),
1383  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1384 }
1385 
1386 /*
1387  * Alter the existing publication.
1388  *
1389  * This is dispatcher function for AlterPublicationOptions,
1390  * AlterPublicationSchemas and AlterPublicationTables.
1391  */
1392 void
1394 {
1395  Relation rel;
1396  HeapTuple tup;
1397  Form_pg_publication pubform;
1398 
1399  rel = table_open(PublicationRelationId, RowExclusiveLock);
1400 
1402  CStringGetDatum(stmt->pubname));
1403 
1404  if (!HeapTupleIsValid(tup))
1405  ereport(ERROR,
1406  (errcode(ERRCODE_UNDEFINED_OBJECT),
1407  errmsg("publication \"%s\" does not exist",
1408  stmt->pubname)));
1409 
1410  pubform = (Form_pg_publication) GETSTRUCT(tup);
1411 
1412  /* must be owner */
1413  if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
1415  stmt->pubname);
1416 
1417  if (stmt->options)
1418  AlterPublicationOptions(pstate, stmt, rel, tup);
1419  else
1420  {
1421  List *relations = NIL;
1422  List *schemaidlist = NIL;
1423  Oid pubid = pubform->oid;
1424 
1425  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1426  &schemaidlist);
1427 
1428  CheckAlterPublication(stmt, tup, relations, schemaidlist);
1429 
1430  heap_freetuple(tup);
1431 
1432  /*
1433  * Lock the publication so nobody else can do anything with it. This
1434  * prevents concurrent alter to add table(s) that were already going
1435  * to become part of the publication by adding corresponding schema(s)
1436  * via this command and similarly it will prevent the concurrent
1437  * addition of schema(s) for which there is any corresponding table
1438  * being added by this command.
1439  */
1440  LockDatabaseObject(PublicationRelationId, pubid, 0,
1442 
1443  /*
1444  * It is possible that by the time we acquire the lock on publication,
1445  * concurrent DDL has removed it. We can test this by checking the
1446  * existence of publication. We get the tuple again to avoid the risk
1447  * of any publication option getting changed.
1448  */
1450  if (!HeapTupleIsValid(tup))
1451  ereport(ERROR,
1452  errcode(ERRCODE_UNDEFINED_OBJECT),
1453  errmsg("publication \"%s\" does not exist",
1454  stmt->pubname));
1455 
1456  AlterPublicationTables(stmt, tup, relations, schemaidlist,
1457  pstate->p_sourcetext);
1458  AlterPublicationSchemas(stmt, tup, schemaidlist);
1459  }
1460 
1461  /* Cleanup. */
1462  heap_freetuple(tup);
1464 }
1465 
1466 /*
1467  * Remove relation from publication by mapping OID.
1468  */
1469 void
1471 {
1472  Relation rel;
1473  HeapTuple tup;
1474  Form_pg_publication_rel pubrel;
1475  List *relids = NIL;
1476 
1477  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1478 
1480 
1481  if (!HeapTupleIsValid(tup))
1482  elog(ERROR, "cache lookup failed for publication table %u",
1483  proid);
1484 
1485  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1486 
1487  /*
1488  * Invalidate relcache so that publication info is rebuilt.
1489  *
1490  * For the partitioned tables, we must invalidate all partitions contained
1491  * in the respective partition hierarchies, not just the one explicitly
1492  * mentioned in the publication. This is required because we implicitly
1493  * publish the child tables when the parent table is published.
1494  */
1496  pubrel->prrelid);
1497 
1498  InvalidatePublicationRels(relids);
1499 
1500  CatalogTupleDelete(rel, &tup->t_self);
1501 
1502  ReleaseSysCache(tup);
1503 
1505 }
1506 
1507 /*
1508  * Remove the publication by mapping OID.
1509  */
1510 void
1512 {
1513  Relation rel;
1514  HeapTuple tup;
1515  Form_pg_publication pubform;
1516 
1517  rel = table_open(PublicationRelationId, RowExclusiveLock);
1518 
1520  if (!HeapTupleIsValid(tup))
1521  elog(ERROR, "cache lookup failed for publication %u", pubid);
1522 
1523  pubform = (Form_pg_publication) GETSTRUCT(tup);
1524 
1525  /* Invalidate relcache so that publication info is rebuilt. */
1526  if (pubform->puballtables)
1528 
1529  CatalogTupleDelete(rel, &tup->t_self);
1530 
1531  ReleaseSysCache(tup);
1532 
1534 }
1535 
1536 /*
1537  * Remove schema from publication by mapping OID.
1538  */
1539 void
1541 {
1542  Relation rel;
1543  HeapTuple tup;
1544  List *schemaRels = NIL;
1546 
1547  rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1548 
1550 
1551  if (!HeapTupleIsValid(tup))
1552  elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1553 
1554  pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1555 
1556  /*
1557  * Invalidate relcache so that publication info is rebuilt. See
1558  * RemovePublicationRelById for why we need to consider all the
1559  * partitions.
1560  */
1561  schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1563  InvalidatePublicationRels(schemaRels);
1564 
1565  CatalogTupleDelete(rel, &tup->t_self);
1566 
1567  ReleaseSysCache(tup);
1568 
1570 }
1571 
1572 /*
1573  * Open relations specified by a relid list.
1574  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1575  * add them to a publication.
1576  */
1577 static List *
1579 {
1580  ListCell *lc;
1581  List *rels = NIL;
1582 
1583  foreach(lc, relids)
1584  {
1585  PublicationRelInfo *pub_rel;
1586  Oid relid = lfirst_oid(lc);
1587  Relation rel = table_open(relid,
1589 
1590  pub_rel = palloc(sizeof(PublicationRelInfo));
1591  pub_rel->relation = rel;
1592  rels = lappend(rels, pub_rel);
1593  }
1594 
1595  return rels;
1596 }
1597 
1598 /*
1599  * Open relations specified by a PublicationTable list.
1600  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1601  * add them to a publication.
1602  */
1603 static List *
1605 {
1606  List *relids = NIL;
1607  List *rels = NIL;
1608  ListCell *lc;
1609  List *relids_with_rf = NIL;
1610  List *relids_with_collist = NIL;
1611 
1612  /*
1613  * Open, share-lock, and check all the explicitly-specified relations
1614  */
1615  foreach(lc, tables)
1616  {
1618  bool recurse = t->relation->inh;
1619  Relation rel;
1620  Oid myrelid;
1621  PublicationRelInfo *pub_rel;
1622 
1623  /* Allow query cancel in case this takes a long time */
1625 
1627  myrelid = RelationGetRelid(rel);
1628 
1629  /*
1630  * Filter out duplicates if user specifies "foo, foo".
1631  *
1632  * Note that this algorithm is known to not be very efficient (O(N^2))
1633  * but given that it only works on list of tables given to us by user
1634  * it's deemed acceptable.
1635  */
1636  if (list_member_oid(relids, myrelid))
1637  {
1638  /* Disallow duplicate tables if there are any with row filters. */
1639  if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1640  ereport(ERROR,
1642  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1643  RelationGetRelationName(rel))));
1644 
1645  /* Disallow duplicate tables if there are any with column lists. */
1646  if (t->columns || list_member_oid(relids_with_collist, myrelid))
1647  ereport(ERROR,
1649  errmsg("conflicting or redundant column lists for table \"%s\"",
1650  RelationGetRelationName(rel))));
1651 
1653  continue;
1654  }
1655 
1656  pub_rel = palloc(sizeof(PublicationRelInfo));
1657  pub_rel->relation = rel;
1658  pub_rel->whereClause = t->whereClause;
1659  pub_rel->columns = t->columns;
1660  rels = lappend(rels, pub_rel);
1661  relids = lappend_oid(relids, myrelid);
1662 
1663  if (t->whereClause)
1664  relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1665 
1666  if (t->columns)
1667  relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1668 
1669  /*
1670  * Add children of this rel, if requested, so that they too are added
1671  * to the publication. A partitioned table can't have any inheritance
1672  * children other than its partitions, which need not be explicitly
1673  * added to the publication.
1674  */
1675  if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1676  {
1677  List *children;
1678  ListCell *child;
1679 
1680  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1681  NULL);
1682 
1683  foreach(child, children)
1684  {
1685  Oid childrelid = lfirst_oid(child);
1686 
1687  /* Allow query cancel in case this takes a long time */
1689 
1690  /*
1691  * Skip duplicates if user specified both parent and child
1692  * tables.
1693  */
1694  if (list_member_oid(relids, childrelid))
1695  {
1696  /*
1697  * We don't allow to specify row filter for both parent
1698  * and child table at the same time as it is not very
1699  * clear which one should be given preference.
1700  */
1701  if (childrelid != myrelid &&
1702  (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1703  ereport(ERROR,
1705  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1706  RelationGetRelationName(rel))));
1707 
1708  /*
1709  * We don't allow to specify column list for both parent
1710  * and child table at the same time as it is not very
1711  * clear which one should be given preference.
1712  */
1713  if (childrelid != myrelid &&
1714  (t->columns || list_member_oid(relids_with_collist, childrelid)))
1715  ereport(ERROR,
1717  errmsg("conflicting or redundant column lists for table \"%s\"",
1718  RelationGetRelationName(rel))));
1719 
1720  continue;
1721  }
1722 
1723  /* find_all_inheritors already got lock */
1724  rel = table_open(childrelid, NoLock);
1725  pub_rel = palloc(sizeof(PublicationRelInfo));
1726  pub_rel->relation = rel;
1727  /* child inherits WHERE clause from parent */
1728  pub_rel->whereClause = t->whereClause;
1729 
1730  /* child inherits column list from parent */
1731  pub_rel->columns = t->columns;
1732  rels = lappend(rels, pub_rel);
1733  relids = lappend_oid(relids, childrelid);
1734 
1735  if (t->whereClause)
1736  relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1737 
1738  if (t->columns)
1739  relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1740  }
1741  }
1742  }
1743 
1744  list_free(relids);
1745  list_free(relids_with_rf);
1746 
1747  return rels;
1748 }
1749 
1750 /*
1751  * Close all relations in the list.
1752  */
1753 static void
1755 {
1756  ListCell *lc;
1757 
1758  foreach(lc, rels)
1759  {
1760  PublicationRelInfo *pub_rel;
1761 
1762  pub_rel = (PublicationRelInfo *) lfirst(lc);
1763  table_close(pub_rel->relation, NoLock);
1764  }
1765 
1766  list_free_deep(rels);
1767 }
1768 
1769 /*
1770  * Lock the schemas specified in the schema list in AccessShareLock mode in
1771  * order to prevent concurrent schema deletion.
1772  */
1773 static void
1774 LockSchemaList(List *schemalist)
1775 {
1776  ListCell *lc;
1777 
1778  foreach(lc, schemalist)
1779  {
1780  Oid schemaid = lfirst_oid(lc);
1781 
1782  /* Allow query cancel in case this takes a long time */
1784  LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1785 
1786  /*
1787  * It is possible that by the time we acquire the lock on schema,
1788  * concurrent DDL has removed it. We can test this by checking the
1789  * existence of schema.
1790  */
1792  ereport(ERROR,
1793  errcode(ERRCODE_UNDEFINED_SCHEMA),
1794  errmsg("schema with OID %u does not exist", schemaid));
1795  }
1796 }
1797 
1798 /*
1799  * Add listed tables to the publication.
1800  */
1801 static void
1802 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1803  AlterPublicationStmt *stmt)
1804 {
1805  ListCell *lc;
1806 
1807  Assert(!stmt || !stmt->for_all_tables);
1808 
1809  foreach(lc, rels)
1810  {
1811  PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1812  Relation rel = pub_rel->relation;
1813  ObjectAddress obj;
1814 
1815  /* Must be owner of the table or superuser. */
1819 
1820  obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1821  if (stmt)
1822  {
1824  (Node *) stmt);
1825 
1826  InvokeObjectPostCreateHook(PublicationRelRelationId,
1827  obj.objectId, 0);
1828  }
1829  }
1830 }
1831 
1832 /*
1833  * Remove listed tables from the publication.
1834  */
1835 static void
1836 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1837 {
1838  ObjectAddress obj;
1839  ListCell *lc;
1840  Oid prid;
1841 
1842  foreach(lc, rels)
1843  {
1844  PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1845  Relation rel = pubrel->relation;
1846  Oid relid = RelationGetRelid(rel);
1847 
1848  if (pubrel->columns)
1849  ereport(ERROR,
1850  errcode(ERRCODE_SYNTAX_ERROR),
1851  errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1852 
1853  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1854  ObjectIdGetDatum(relid),
1855  ObjectIdGetDatum(pubid));
1856  if (!OidIsValid(prid))
1857  {
1858  if (missing_ok)
1859  continue;
1860 
1861  ereport(ERROR,
1862  (errcode(ERRCODE_UNDEFINED_OBJECT),
1863  errmsg("relation \"%s\" is not part of the publication",
1864  RelationGetRelationName(rel))));
1865  }
1866 
1867  if (pubrel->whereClause)
1868  ereport(ERROR,
1869  (errcode(ERRCODE_SYNTAX_ERROR),
1870  errmsg("cannot use a WHERE clause when removing a table from a publication")));
1871 
1872  ObjectAddressSet(obj, PublicationRelRelationId, prid);
1873  performDeletion(&obj, DROP_CASCADE, 0);
1874  }
1875 }
1876 
1877 /*
1878  * Add listed schemas to the publication.
1879  */
1880 static void
1881 PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1882  AlterPublicationStmt *stmt)
1883 {
1884  ListCell *lc;
1885 
1886  Assert(!stmt || !stmt->for_all_tables);
1887 
1888  foreach(lc, schemas)
1889  {
1890  Oid schemaid = lfirst_oid(lc);
1891  ObjectAddress obj;
1892 
1893  obj = publication_add_schema(pubid, schemaid, if_not_exists);
1894  if (stmt)
1895  {
1897  (Node *) stmt);
1898 
1899  InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1900  obj.objectId, 0);
1901  }
1902  }
1903 }
1904 
1905 /*
1906  * Remove listed schemas from the publication.
1907  */
1908 static void
1909 PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1910 {
1911  ObjectAddress obj;
1912  ListCell *lc;
1913  Oid psid;
1914 
1915  foreach(lc, schemas)
1916  {
1917  Oid schemaid = lfirst_oid(lc);
1918 
1920  Anum_pg_publication_namespace_oid,
1921  ObjectIdGetDatum(schemaid),
1922  ObjectIdGetDatum(pubid));
1923  if (!OidIsValid(psid))
1924  {
1925  if (missing_ok)
1926  continue;
1927 
1928  ereport(ERROR,
1929  (errcode(ERRCODE_UNDEFINED_OBJECT),
1930  errmsg("tables from schema \"%s\" are not part of the publication",
1931  get_namespace_name(schemaid))));
1932  }
1933 
1934  ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1935  performDeletion(&obj, DROP_CASCADE, 0);
1936  }
1937 }
1938 
1939 /*
1940  * Internal workhorse for changing a publication owner
1941  */
1942 static void
1944 {
1945  Form_pg_publication form;
1946 
1947  form = (Form_pg_publication) GETSTRUCT(tup);
1948 
1949  if (form->pubowner == newOwnerId)
1950  return;
1951 
1952  if (!superuser())
1953  {
1954  AclResult aclresult;
1955 
1956  /* Must be owner */
1957  if (!pg_publication_ownercheck(form->oid, GetUserId()))
1959  NameStr(form->pubname));
1960 
1961  /* Must be able to become new owner */
1962  check_is_member_of_role(GetUserId(), newOwnerId);
1963 
1964  /* New owner must have CREATE privilege on database */
1965  aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
1966  if (aclresult != ACLCHECK_OK)
1967  aclcheck_error(aclresult, OBJECT_DATABASE,
1969 
1970  if (form->puballtables && !superuser_arg(newOwnerId))
1971  ereport(ERROR,
1972  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1973  errmsg("permission denied to change owner of publication \"%s\"",
1974  NameStr(form->pubname)),
1975  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1976 
1977  if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1978  ereport(ERROR,
1979  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1980  errmsg("permission denied to change owner of publication \"%s\"",
1981  NameStr(form->pubname)),
1982  errhint("The owner of a FOR ALL TABLES IN SCHEMA publication must be a superuser.")));
1983  }
1984 
1985  form->pubowner = newOwnerId;
1986  CatalogTupleUpdate(rel, &tup->t_self, tup);
1987 
1988  /* Update owner dependency reference */
1989  changeDependencyOnOwner(PublicationRelationId,
1990  form->oid,
1991  newOwnerId);
1992 
1993  InvokeObjectPostAlterHook(PublicationRelationId,
1994  form->oid, 0);
1995 }
1996 
1997 /*
1998  * Change publication owner -- by name
1999  */
2001 AlterPublicationOwner(const char *name, Oid newOwnerId)
2002 {
2003  Oid subid;
2004  HeapTuple tup;
2005  Relation rel;
2006  ObjectAddress address;
2007  Form_pg_publication pubform;
2008 
2009  rel = table_open(PublicationRelationId, RowExclusiveLock);
2010 
2012 
2013  if (!HeapTupleIsValid(tup))
2014  ereport(ERROR,
2015  (errcode(ERRCODE_UNDEFINED_OBJECT),
2016  errmsg("publication \"%s\" does not exist", name)));
2017 
2018  pubform = (Form_pg_publication) GETSTRUCT(tup);
2019  subid = pubform->oid;
2020 
2021  AlterPublicationOwner_internal(rel, tup, newOwnerId);
2022 
2023  ObjectAddressSet(address, PublicationRelationId, subid);
2024 
2025  heap_freetuple(tup);
2026 
2028 
2029  return address;
2030 }
2031 
2032 /*
2033  * Change publication owner -- by OID
2034  */
2035 void
2037 {
2038  HeapTuple tup;
2039  Relation rel;
2040 
2041  rel = table_open(PublicationRelationId, RowExclusiveLock);
2042 
2044 
2045  if (!HeapTupleIsValid(tup))
2046  ereport(ERROR,
2047  (errcode(ERRCODE_UNDEFINED_OBJECT),
2048  errmsg("publication with OID %u does not exist", subid)));
2049 
2050  AlterPublicationOwner_internal(rel, tup, newOwnerId);
2051 
2052  heap_freetuple(tup);
2053 
2055 }
void check_is_member_of_role(Oid member, Oid role)
Definition: acl.c:5003
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
@ ACLCHECK_NOT_OWNER
Definition: acl.h:184
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:5171
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3512
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:5033
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5709
int16 AttrNumber
Definition: attnum.h:21
#define InvalidAttrNumber
Definition: attnum.h:23
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:94
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1045
void bms_free(Bitmapset *a)
Definition: bitmapset.c:208
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:427
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:738
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define TextDatumGetCString(d)
Definition: builtins.h:86
#define NameStr(name)
Definition: c.h:681
#define OidIsValid(objectId)
Definition: c.h:710
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:391
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2994
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:352
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:317
int errdetail(const char *fmt,...)
Definition: elog.c:1037
int errhint(const char *fmt,...)
Definition: elog.c:1151
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define _(x)
Definition: elog.c:89
#define WARNING
Definition: elog.h:30
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
const char * name
Definition: encode.c:561
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:3564
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:631
Oid MyDatabaseId
Definition: globals.c:89
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition: heaptuple.c:359
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1422
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1387
int x
Definition: isn.c:71
Assert(fmt[strlen(fmt) - 1] !='\n')
List * list_difference_oid(const List *list1, const List *list2)
Definition: list.c:1272
List * list_concat_unique_oid(List *list1, const List *list2)
Definition: list.c:1428
List * lappend(List *list, void *datum)
Definition: list.c:336
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
void list_free(List *list)
Definition: list.c:1505
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:701
List * list_concat_copy(const List *list1, const List *list2)
Definition: list.c:577
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1339
void list_free_deep(List *list)
Definition: list.c:1519
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1005
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
AttrNumber get_attnum(Oid relid, const char *attname)
Definition: lsyscache.c:856
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3326
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1984
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1909
char func_volatile(Oid funcid)
Definition: lsyscache.c:1761
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition: lsyscache.c:825
void * palloc(Size size)
Definition: mcxt.c:1068
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
Oid GetUserId(void)
Definition: miscinit.c:492
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
List * fetch_search_path(bool includeImplicit)
Definition: namespace.c:4431
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Definition: namespace.c:3089
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:41
Oid exprInputCollation(const Node *expr)
Definition: nodeFuncs.c:1034
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Definition: nodeFuncs.c:1832
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:788
bool expression_tree_walker(Node *node, bool(*walker)(), void *context)
Definition: nodeFuncs.c:2015
int exprLocation(const Node *expr)
Definition: nodeFuncs.c:1343
#define IsA(nodeptr, _type_)
Definition: nodes.h:624
#define copyObject(obj)
Definition: nodes.h:689
#define nodeTag(nodeptr)
Definition: nodes.h:578
@ T_CoalesceExpr
Definition: nodes.h:187
@ T_List
Definition: nodes.h:317
@ T_ArrayExpr
Definition: nodes.h:184
@ T_CollateExpr
Definition: nodes.h:180
@ T_BoolExpr
Definition: nodes.h:170
@ T_OpExpr
Definition: nodes.h:166
@ T_ScalarArrayOpExpr
Definition: nodes.h:169
@ T_CaseExpr
Definition: nodes.h:181
@ T_RelabelType
Definition: nodes.h:176
@ T_XmlExpr
Definition: nodes.h:190
@ T_RowExpr
Definition: nodes.h:185
@ T_MinMaxExpr
Definition: nodes.h:188
@ T_BooleanTest
Definition: nodes.h:192
@ T_Const
Definition: nodes.h:158
@ T_DistinctExpr
Definition: nodes.h:167
@ T_FuncExpr
Definition: nodes.h:164
@ T_Var
Definition: nodes.h:157
@ T_RowCompareExpr
Definition: nodes.h:186
@ T_CaseTestExpr
Definition: nodes.h:183
@ T_NullTest
Definition: nodes.h:191
@ T_NullIfExpr
Definition: nodes.h:168
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:171
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:195
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
Definition: parse_node.c:76
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:110
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:43
@ EXPR_KIND_WHERE
Definition: parse_node.h:46
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
PublicationObjSpecType
Definition: parsenodes.h:4007
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
Definition: parsenodes.h:4010
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
Definition: parsenodes.h:4009
@ PUBLICATIONOBJ_TABLE
Definition: parsenodes.h:4008
@ AP_DropObjects
Definition: parsenodes.h:4036
@ AP_SetObjects
Definition: parsenodes.h:4037
@ AP_AddObjects
Definition: parsenodes.h:4035
@ DROP_CASCADE
Definition: parsenodes.h:2208
@ OBJECT_DATABASE
Definition: parsenodes.h:2143
@ OBJECT_PUBLICATION
Definition: parsenodes.h:2164
#define ACL_CREATE
Definition: parsenodes.h:91
int16 attnum
Definition: pg_attribute.h:83
NameData relname
Definition: pg_class.h:38
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:169
#define lfirst_node(type, lc)
Definition: pg_list.h:172
static int list_length(const List *l)
Definition: pg_list.h:149
#define NIL
Definition: pg_list.h:65
#define linitial_oid(l)
Definition: pg_list.h:176
#define lfirst_oid(lc)
Definition: pg_list.h:171
bool is_schema_publication(Oid pubid)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:312
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
#define CStringGetDatum(X)
Definition: postgres.h:622
uintptr_t Datum
Definition: postgres.h:411
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define BoolGetDatum(X)
Definition: postgres.h:446
unsigned int Oid
Definition: postgres_ext.h:31
struct rf_context rf_context
static void CheckPubRelationColumnList(List *tables, const char *queryString, bool pubviaroot)
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
static void CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, PublicationObjSpecType checkobjtype)
void RemovePublicationById(Oid pubid)
static void expr_allowed_in_node(Node *node, ParseState *pstate, char **errdetail_msg)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static List * OpenRelIdList(List *relids)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist, const char *queryString)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
Definition: read.c:89
#define RelationGetRelid(relation)
Definition: rel.h:489
#define RelationGetDescr(relation)
Definition: rel.h:515
#define RelationGetRelationName(relation)
Definition: rel.h:523
#define RelationGetNamespace(relation)
Definition: rel.h:530
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:5105
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition: relcache.h:60
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
AlterPublicationAction action
Definition: parsenodes.h:4054
char * defname
Definition: parsenodes.h:765
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:51
Definition: nodes.h:574
const char * p_sourcetext
Definition: parse_node.h:182
PublicationObjSpecType pubobjtype
Definition: parsenodes.h:4018
PublicationTable * pubtable
Definition: parsenodes.h:4020
RangeVar * relation
Definition: parsenodes.h:3998
bool inh
Definition: primnodes.h:69
Form_pg_class rd_rel
Definition: rel.h:109
Definition: primnodes.h:196
AttrNumber varattno
Definition: primnodes.h:200
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
bool superuser(void)
Definition: superuser.c:46
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1221
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1173
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1434
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1184
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ PUBLICATIONREL
Definition: syscache.h:84
@ PUBLICATIONOID
Definition: syscache.h:83
@ PUBLICATIONNAME
Definition: syscache.h:80
@ PUBLICATIONNAMESPACE
Definition: syscache.h:81
@ PUBLICATIONNAMESPACEMAP
Definition: syscache.h:82
@ PUBLICATIONRELMAP
Definition: syscache.h:85
@ NAMESPACEOID
Definition: syscache.h:70
#define SearchSysCacheExists1(cacheId, key1)
Definition: syscache.h:188
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:197
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:199
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:102
#define FirstNormalObjectId
Definition: transam.h:197
#define strVal(v)
Definition: value.h:72
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3715
void CommandCounterIncrement(void)
Definition: xact.c:1074
int wal_level
Definition: xlog.c:132
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:71