PostgreSQL Source Code  git master
publicationcmds.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * publicationcmds.c
4  * publication manipulation
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/commands/publicationcmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "catalog/objectaccess.h"
24 #include "catalog/objectaddress.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_inherits.h"
27 #include "catalog/pg_namespace.h"
28 #include "catalog/pg_proc.h"
29 #include "catalog/pg_publication.h"
32 #include "commands/dbcommands.h"
33 #include "commands/defrem.h"
34 #include "commands/event_trigger.h"
36 #include "miscadmin.h"
37 #include "nodes/nodeFuncs.h"
38 #include "parser/parse_clause.h"
39 #include "parser/parse_collate.h"
40 #include "parser/parse_relation.h"
41 #include "storage/lmgr.h"
42 #include "utils/acl.h"
43 #include "utils/builtins.h"
44 #include "utils/inval.h"
45 #include "utils/lsyscache.h"
46 #include "utils/rel.h"
47 #include "utils/syscache.h"
48 #include "utils/varlena.h"
49 
50 
51 /*
52  * Information used to validate the columns in the row filter expression. See
53  * contain_invalid_rfcolumn_walker for details.
54  */
55 typedef struct rf_context
56 {
57  Bitmapset *bms_replident; /* bitset of replica identity columns */
58  bool pubviaroot; /* true if we are validating the parent
59  * relation's row filter */
60  Oid relid; /* relid of the relation */
61  Oid parentid; /* relid of the parent relation */
63 
64 static List *OpenTableList(List *tables);
65 static void CloseTableList(List *rels);
66 static void LockSchemaList(List *schemalist);
67 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
69 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
72 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 
74 
75 static void
77  List *options,
78  bool *publish_given,
79  PublicationActions *pubactions,
80  bool *publish_via_partition_root_given,
81  bool *publish_via_partition_root)
82 {
83  ListCell *lc;
84 
85  *publish_given = false;
86  *publish_via_partition_root_given = false;
87 
88  /* defaults */
89  pubactions->pubinsert = true;
90  pubactions->pubupdate = true;
91  pubactions->pubdelete = true;
92  pubactions->pubtruncate = true;
93  *publish_via_partition_root = false;
94 
95  /* Parse options */
96  foreach(lc, options)
97  {
98  DefElem *defel = (DefElem *) lfirst(lc);
99 
100  if (strcmp(defel->defname, "publish") == 0)
101  {
102  char *publish;
103  List *publish_list;
104  ListCell *lc2;
105 
106  if (*publish_given)
107  errorConflictingDefElem(defel, pstate);
108 
109  /*
110  * If publish option was given only the explicitly listed actions
111  * should be published.
112  */
113  pubactions->pubinsert = false;
114  pubactions->pubupdate = false;
115  pubactions->pubdelete = false;
116  pubactions->pubtruncate = false;
117 
118  *publish_given = true;
119  publish = defGetString(defel);
120 
121  if (!SplitIdentifierString(publish, ',', &publish_list))
122  ereport(ERROR,
123  (errcode(ERRCODE_SYNTAX_ERROR),
124  errmsg("invalid list syntax in parameter \"%s\"",
125  "publish")));
126 
127  /* Process the option list. */
128  foreach(lc2, publish_list)
129  {
130  char *publish_opt = (char *) lfirst(lc2);
131 
132  if (strcmp(publish_opt, "insert") == 0)
133  pubactions->pubinsert = true;
134  else if (strcmp(publish_opt, "update") == 0)
135  pubactions->pubupdate = true;
136  else if (strcmp(publish_opt, "delete") == 0)
137  pubactions->pubdelete = true;
138  else if (strcmp(publish_opt, "truncate") == 0)
139  pubactions->pubtruncate = true;
140  else
141  ereport(ERROR,
142  (errcode(ERRCODE_SYNTAX_ERROR),
143  errmsg("unrecognized value for publication option \"%s\": \"%s\"",
144  "publish", publish_opt)));
145  }
146  }
147  else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
148  {
149  if (*publish_via_partition_root_given)
150  errorConflictingDefElem(defel, pstate);
151  *publish_via_partition_root_given = true;
152  *publish_via_partition_root = defGetBoolean(defel);
153  }
154  else
155  ereport(ERROR,
156  (errcode(ERRCODE_SYNTAX_ERROR),
157  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
158  }
159 }
160 
161 /*
162  * Convert the PublicationObjSpecType list into schema oid list and
163  * PublicationTable list.
164  */
165 static void
166 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
167  List **rels, List **schemas)
168 {
169  ListCell *cell;
170  PublicationObjSpec *pubobj;
171 
172  if (!pubobjspec_list)
173  return;
174 
175  foreach(cell, pubobjspec_list)
176  {
177  Oid schemaid;
178  List *search_path;
179 
180  pubobj = (PublicationObjSpec *) lfirst(cell);
181 
182  switch (pubobj->pubobjtype)
183  {
185  *rels = lappend(*rels, pubobj->pubtable);
186  break;
188  schemaid = get_namespace_oid(pubobj->name, false);
189 
190  /* Filter out duplicates if user specifies "sch1, sch1" */
191  *schemas = list_append_unique_oid(*schemas, schemaid);
192  break;
194  search_path = fetch_search_path(false);
195  if (search_path == NIL) /* nothing valid in search_path? */
196  ereport(ERROR,
197  errcode(ERRCODE_UNDEFINED_SCHEMA),
198  errmsg("no schema has been selected for CURRENT_SCHEMA"));
199 
200  schemaid = linitial_oid(search_path);
201  list_free(search_path);
202 
203  /* Filter out duplicates if user specifies "sch1, sch1" */
204  *schemas = list_append_unique_oid(*schemas, schemaid);
205  break;
206  default:
207  /* shouldn't happen */
208  elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
209  break;
210  }
211  }
212 }
213 
214 /*
215  * Returns true if any of the columns used in the row filter WHERE expression is
216  * not part of REPLICA IDENTITY, false otherwise.
217  */
218 static bool
220 {
221  if (node == NULL)
222  return false;
223 
224  if (IsA(node, Var))
225  {
226  Var *var = (Var *) node;
227  AttrNumber attnum = var->varattno;
228 
229  /*
230  * If pubviaroot is true, we are validating the row filter of the
231  * parent table, but the bitmap contains the replica identity
232  * information of the child table. So, get the column number of the
233  * child table as parent and child column order could be different.
234  */
235  if (context->pubviaroot)
236  {
237  char *colname = get_attname(context->parentid, attnum, false);
238 
239  attnum = get_attnum(context->relid, colname);
240  }
241 
243  context->bms_replident))
244  return true;
245  }
246 
248  (void *) context);
249 }
250 
251 /*
252  * Check if all columns referenced in the filter expression are part of the
253  * REPLICA IDENTITY index or not.
254  *
255  * Returns true if any invalid column is found.
256  */
257 bool
258 pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
259  bool pubviaroot)
260 {
261  HeapTuple rftuple;
262  Oid relid = RelationGetRelid(relation);
263  Oid publish_as_relid = RelationGetRelid(relation);
264  bool result = false;
265  Datum rfdatum;
266  bool rfisnull;
267 
268  /*
269  * FULL means all columns are in the REPLICA IDENTITY, so all columns are
270  * allowed in the row filter and we can skip the validation.
271  */
272  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
273  return false;
274 
275  /*
276  * For a partition, if pubviaroot is true, find the topmost ancestor that
277  * is published via this publication as we need to use its row filter
278  * expression to filter the partition's changes.
279  *
280  * Note that even though the row filter used is for an ancestor, the
281  * REPLICA IDENTITY used will be for the actual child table.
282  */
283  if (pubviaroot && relation->rd_rel->relispartition)
284  {
285  publish_as_relid
286  = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
287 
288  if (!OidIsValid(publish_as_relid))
289  publish_as_relid = relid;
290  }
291 
292  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
293  ObjectIdGetDatum(publish_as_relid),
294  ObjectIdGetDatum(pubid));
295 
296  if (!HeapTupleIsValid(rftuple))
297  return false;
298 
299  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
300  Anum_pg_publication_rel_prqual,
301  &rfisnull);
302 
303  if (!rfisnull)
304  {
305  rf_context context = {0};
306  Node *rfnode;
307  Bitmapset *bms = NULL;
308 
309  context.pubviaroot = pubviaroot;
310  context.parentid = publish_as_relid;
311  context.relid = relid;
312 
313  /* Remember columns that are part of the REPLICA IDENTITY */
314  bms = RelationGetIndexAttrBitmap(relation,
316 
317  context.bms_replident = bms;
318  rfnode = stringToNode(TextDatumGetCString(rfdatum));
319  result = contain_invalid_rfcolumn_walker(rfnode, &context);
320  }
321 
322  ReleaseSysCache(rftuple);
323 
324  return result;
325 }
326 
327 /*
328  * Check if all columns referenced in the REPLICA IDENTITY are covered by
329  * the column list.
330  *
331  * Returns true if any replica identity column is not covered by column list.
332  */
333 bool
335  bool pubviaroot)
336 {
337  HeapTuple tuple;
338  Oid relid = RelationGetRelid(relation);
339  Oid publish_as_relid = RelationGetRelid(relation);
340  bool result = false;
341  Datum datum;
342  bool isnull;
343 
344  /*
345  * For a partition, if pubviaroot is true, find the topmost ancestor that
346  * is published via this publication as we need to use its column list for
347  * the changes.
348  *
349  * Note that even though the column list used is for an ancestor, the
350  * REPLICA IDENTITY used will be for the actual child table.
351  */
352  if (pubviaroot && relation->rd_rel->relispartition)
353  {
354  publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
355 
356  if (!OidIsValid(publish_as_relid))
357  publish_as_relid = relid;
358  }
359 
360  tuple = SearchSysCache2(PUBLICATIONRELMAP,
361  ObjectIdGetDatum(publish_as_relid),
362  ObjectIdGetDatum(pubid));
363 
364  if (!HeapTupleIsValid(tuple))
365  return false;
366 
367  datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
368  Anum_pg_publication_rel_prattrs,
369  &isnull);
370 
371  if (!isnull)
372  {
373  int x;
374  Bitmapset *idattrs;
375  Bitmapset *columns = NULL;
376 
377  /* With REPLICA IDENTITY FULL, no column list is allowed. */
378  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
379  result = true;
380 
381  /* Transform the column list datum to a bitmapset. */
382  columns = pub_collist_to_bitmapset(NULL, datum, NULL);
383 
384  /* Remember columns that are part of the REPLICA IDENTITY */
385  idattrs = RelationGetIndexAttrBitmap(relation,
387 
388  /*
389  * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
390  * offset (to handle system columns the usual way), while column list
391  * does not use offset, so we can't do bms_is_subset(). Instead, we
392  * have to loop over the idattrs and check all of them are in the
393  * list.
394  */
395  x = -1;
396  while ((x = bms_next_member(idattrs, x)) >= 0)
397  {
399 
400  /*
401  * If pubviaroot is true, we are validating the column list of the
402  * parent table, but the bitmap contains the replica identity
403  * information of the child table. The parent/child attnums may
404  * not match, so translate them to the parent - get the attname
405  * from the child, and look it up in the parent.
406  */
407  if (pubviaroot)
408  {
409  /* attribute name in the child table */
410  char *colname = get_attname(relid, attnum, false);
411 
412  /*
413  * Determine the attnum for the attribute name in parent (we
414  * are using the column list defined on the parent).
415  */
416  attnum = get_attnum(publish_as_relid, colname);
417  }
418 
419  /* replica identity column, not covered by the column list */
420  if (!bms_is_member(attnum, columns))
421  {
422  result = true;
423  break;
424  }
425  }
426 
427  bms_free(idattrs);
428  bms_free(columns);
429  }
430 
431  ReleaseSysCache(tuple);
432 
433  return result;
434 }
435 
436 /* check_functions_in_node callback */
437 static bool
439 {
440  return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
441  func_id >= FirstNormalObjectId);
442 }
443 
444 /*
445  * The row filter walker checks if the row filter expression is a "simple
446  * expression".
447  *
448  * It allows only simple or compound expressions such as:
449  * - (Var Op Const)
450  * - (Var Op Var)
451  * - (Var Op Const) AND/OR (Var Op Const)
452  * - etc
453  * (where Var is a column of the table this filter belongs to)
454  *
455  * The simple expression has the following restrictions:
456  * - User-defined operators are not allowed;
457  * - User-defined functions are not allowed;
458  * - User-defined types are not allowed;
459  * - User-defined collations are not allowed;
460  * - Non-immutable built-in functions are not allowed;
461  * - System columns are not allowed.
462  *
463  * NOTES
464  *
465  * We don't allow user-defined functions/operators/types/collations because
466  * (a) if a user drops a user-defined object used in a row filter expression or
467  * if there is any other error while using it, the logical decoding
468  * infrastructure won't be able to recover from such an error even if the
469  * object is recreated again because a historic snapshot is used to evaluate
470  * the row filter;
471  * (b) a user-defined function can be used to access tables that could have
472  * unpleasant results because a historic snapshot is used. That's why only
473  * immutable built-in functions are allowed in row filter expressions.
474  *
475  * We don't allow system columns because currently, we don't have that
476  * information in the tuple passed to downstream. Also, as we don't replicate
477  * those to subscribers, there doesn't seem to be a need for a filter on those
478  * columns.
479  *
480  * We can allow other node types after more analysis and testing.
481  */
482 static bool
484 {
485  char *errdetail_msg = NULL;
486 
487  if (node == NULL)
488  return false;
489 
490  switch (nodeTag(node))
491  {
492  case T_Var:
493  /* System columns are not allowed. */
494  if (((Var *) node)->varattno < InvalidAttrNumber)
495  errdetail_msg = _("System columns are not allowed.");
496  break;
497  case T_OpExpr:
498  case T_DistinctExpr:
499  case T_NullIfExpr:
500  /* OK, except user-defined operators are not allowed. */
501  if (((OpExpr *) node)->opno >= FirstNormalObjectId)
502  errdetail_msg = _("User-defined operators are not allowed.");
503  break;
504  case T_ScalarArrayOpExpr:
505  /* OK, except user-defined operators are not allowed. */
506  if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
507  errdetail_msg = _("User-defined operators are not allowed.");
508 
509  /*
510  * We don't need to check the hashfuncid and negfuncid of
511  * ScalarArrayOpExpr as those functions are only built for a
512  * subquery.
513  */
514  break;
515  case T_RowCompareExpr:
516  {
517  ListCell *opid;
518 
519  /* OK, except user-defined operators are not allowed. */
520  foreach(opid, ((RowCompareExpr *) node)->opnos)
521  {
522  if (lfirst_oid(opid) >= FirstNormalObjectId)
523  {
524  errdetail_msg = _("User-defined operators are not allowed.");
525  break;
526  }
527  }
528  }
529  break;
530  case T_Const:
531  case T_FuncExpr:
532  case T_BoolExpr:
533  case T_RelabelType:
534  case T_CollateExpr:
535  case T_CaseExpr:
536  case T_CaseTestExpr:
537  case T_ArrayExpr:
538  case T_RowExpr:
539  case T_CoalesceExpr:
540  case T_MinMaxExpr:
541  case T_XmlExpr:
542  case T_NullTest:
543  case T_BooleanTest:
544  case T_List:
545  /* OK, supported */
546  break;
547  default:
548  errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
549  break;
550  }
551 
552  /*
553  * For all the supported nodes, if we haven't already found a problem,
554  * check the types, functions, and collations used in it. We check List
555  * by walking through each element.
556  */
557  if (!errdetail_msg && !IsA(node, List))
558  {
559  if (exprType(node) >= FirstNormalObjectId)
560  errdetail_msg = _("User-defined types are not allowed.");
562  (void *) pstate))
563  errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
564  else if (exprCollation(node) >= FirstNormalObjectId ||
566  errdetail_msg = _("User-defined collations are not allowed.");
567  }
568 
569  /*
570  * If we found a problem in this node, throw error now. Otherwise keep
571  * going.
572  */
573  if (errdetail_msg)
574  ereport(ERROR,
575  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
576  errmsg("invalid publication WHERE expression"),
577  errdetail_internal("%s", errdetail_msg),
578  parser_errposition(pstate, exprLocation(node))));
579 
581  (void *) pstate);
582 }
583 
584 /*
585  * Check if the row filter expression is a "simple expression".
586  *
587  * See check_simple_rowfilter_expr_walker for details.
588  */
589 static bool
591 {
592  return check_simple_rowfilter_expr_walker(node, pstate);
593 }
594 
595 /*
596  * Transform the publication WHERE expression for all the relations in the list,
597  * ensuring it is coerced to boolean and necessary collation information is
598  * added if required, and add a new nsitem/RTE for the associated relation to
599  * the ParseState's namespace list.
600  *
601  * Also check the publication row filter expression and throw an error if
602  * anything not permitted or unexpected is encountered.
603  */
604 static void
605 TransformPubWhereClauses(List *tables, const char *queryString,
606  bool pubviaroot)
607 {
608  ListCell *lc;
609 
610  foreach(lc, tables)
611  {
612  ParseNamespaceItem *nsitem;
613  Node *whereclause = NULL;
614  ParseState *pstate;
616 
617  if (pri->whereClause == NULL)
618  continue;
619 
620  /*
621  * If the publication doesn't publish changes via the root partitioned
622  * table, the partition's row filter will be used. So disallow using
623  * WHERE clause on partitioned table in this case.
624  */
625  if (!pubviaroot &&
626  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
627  ereport(ERROR,
628  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629  errmsg("cannot use publication WHERE clause for relation \"%s\"",
631  errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
632  "publish_via_partition_root")));
633 
634  /*
635  * A fresh pstate is required so that we only have "this" table in its
636  * rangetable
637  */
638  pstate = make_parsestate(NULL);
639  pstate->p_sourcetext = queryString;
640  nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
641  AccessShareLock, NULL,
642  false, false);
643  addNSItemToQuery(pstate, nsitem, false, true, true);
644 
645  whereclause = transformWhereClause(pstate,
646  copyObject(pri->whereClause),
648  "PUBLICATION WHERE");
649 
650  /* Fix up collation information */
651  assign_expr_collations(pstate, whereclause);
652 
653  /*
654  * We allow only simple expressions in row filters. See
655  * check_simple_rowfilter_expr_walker.
656  */
657  check_simple_rowfilter_expr(whereclause, pstate);
658 
659  free_parsestate(pstate);
660 
661  pri->whereClause = whereclause;
662  }
663 }
664 
665 
666 /*
667  * Given a list of tables that are going to be added to a publication,
668  * verify that they fulfill the necessary preconditions, namely: no tables
669  * have a column list if any schema is published; and partitioned tables do
670  * not have column lists if publish_via_partition_root is not set.
671  *
672  * 'publish_schema' indicates that the publication contains any TABLES IN
673  * SCHEMA elements (newly added in this command, or preexisting).
674  * 'pubviaroot' is the value of publish_via_partition_root.
675  */
676 static void
677 CheckPubRelationColumnList(char *pubname, List *tables,
678  bool publish_schema, bool pubviaroot)
679 {
680  ListCell *lc;
681 
682  foreach(lc, tables)
683  {
685 
686  if (pri->columns == NIL)
687  continue;
688 
689  /*
690  * Disallow specifying column list if any schema is in the
691  * publication.
692  *
693  * XXX We could instead just forbid the case when the publication
694  * tries to publish the table with a column list and a schema for that
695  * table. However, if we do that then we need a restriction during
696  * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
697  * seem to be a good idea.
698  */
699  if (publish_schema)
700  ereport(ERROR,
701  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
702  errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
704  RelationGetRelationName(pri->relation), pubname),
705  errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
706 
707  /*
708  * If the publication doesn't publish changes via the root partitioned
709  * table, the partition's column list will be used. So disallow using
710  * a column list on the partitioned table in this case.
711  */
712  if (!pubviaroot &&
713  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
714  ereport(ERROR,
715  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
716  errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
718  RelationGetRelationName(pri->relation), pubname),
719  errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
720  "publish_via_partition_root")));
721  }
722 }
723 
724 /*
725  * Create new publication.
726  */
729 {
730  Relation rel;
731  ObjectAddress myself;
732  Oid puboid;
733  bool nulls[Natts_pg_publication];
734  Datum values[Natts_pg_publication];
735  HeapTuple tup;
736  bool publish_given;
737  PublicationActions pubactions;
738  bool publish_via_partition_root_given;
739  bool publish_via_partition_root;
740  AclResult aclresult;
741  List *relations = NIL;
742  List *schemaidlist = NIL;
743 
744  /* must have CREATE privilege on database */
745  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
746  if (aclresult != ACLCHECK_OK)
747  aclcheck_error(aclresult, OBJECT_DATABASE,
749 
750  /* FOR ALL TABLES requires superuser */
751  if (stmt->for_all_tables && !superuser())
752  ereport(ERROR,
753  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
754  errmsg("must be superuser to create FOR ALL TABLES publication")));
755 
756  rel = table_open(PublicationRelationId, RowExclusiveLock);
757 
758  /* Check if name is used */
759  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
760  CStringGetDatum(stmt->pubname));
761  if (OidIsValid(puboid))
762  ereport(ERROR,
764  errmsg("publication \"%s\" already exists",
765  stmt->pubname)));
766 
767  /* Form a tuple. */
768  memset(values, 0, sizeof(values));
769  memset(nulls, false, sizeof(nulls));
770 
771  values[Anum_pg_publication_pubname - 1] =
773  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
774 
776  stmt->options,
777  &publish_given, &pubactions,
778  &publish_via_partition_root_given,
779  &publish_via_partition_root);
780 
781  puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
782  Anum_pg_publication_oid);
783  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
784  values[Anum_pg_publication_puballtables - 1] =
785  BoolGetDatum(stmt->for_all_tables);
786  values[Anum_pg_publication_pubinsert - 1] =
787  BoolGetDatum(pubactions.pubinsert);
788  values[Anum_pg_publication_pubupdate - 1] =
789  BoolGetDatum(pubactions.pubupdate);
790  values[Anum_pg_publication_pubdelete - 1] =
791  BoolGetDatum(pubactions.pubdelete);
792  values[Anum_pg_publication_pubtruncate - 1] =
793  BoolGetDatum(pubactions.pubtruncate);
794  values[Anum_pg_publication_pubviaroot - 1] =
795  BoolGetDatum(publish_via_partition_root);
796 
797  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
798 
799  /* Insert tuple into catalog. */
800  CatalogTupleInsert(rel, tup);
801  heap_freetuple(tup);
802 
803  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
804 
805  ObjectAddressSet(myself, PublicationRelationId, puboid);
806 
807  /* Make the changes visible. */
809 
810  /* Associate objects with the publication. */
811  if (stmt->for_all_tables)
812  {
813  /* Invalidate relcache so that publication info is rebuilt. */
815  }
816  else
817  {
818  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
819  &schemaidlist);
820 
821  /* FOR TABLES IN SCHEMA requires superuser */
822  if (schemaidlist != NIL && !superuser())
823  ereport(ERROR,
824  errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
825  errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
826 
827  if (relations != NIL)
828  {
829  List *rels;
830 
831  rels = OpenTableList(relations);
833  publish_via_partition_root);
834 
835  CheckPubRelationColumnList(stmt->pubname, rels,
836  schemaidlist != NIL,
837  publish_via_partition_root);
838 
839  PublicationAddTables(puboid, rels, true, NULL);
840  CloseTableList(rels);
841  }
842 
843  if (schemaidlist != NIL)
844  {
845  /*
846  * Schema lock is held until the publication is created to prevent
847  * concurrent schema deletion.
848  */
849  LockSchemaList(schemaidlist);
850  PublicationAddSchemas(puboid, schemaidlist, true, NULL);
851  }
852  }
853 
855 
856  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
857 
860  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
861  errmsg("wal_level is insufficient to publish logical changes"),
862  errhint("Set wal_level to \"logical\" before creating subscriptions.")));
863 
864  return myself;
865 }
866 
867 /*
868  * Change options of a publication.
869  */
870 static void
872  Relation rel, HeapTuple tup)
873 {
874  bool nulls[Natts_pg_publication];
875  bool replaces[Natts_pg_publication];
876  Datum values[Natts_pg_publication];
877  bool publish_given;
878  PublicationActions pubactions;
879  bool publish_via_partition_root_given;
880  bool publish_via_partition_root;
881  ObjectAddress obj;
882  Form_pg_publication pubform;
883  List *root_relids = NIL;
884  ListCell *lc;
885 
887  stmt->options,
888  &publish_given, &pubactions,
889  &publish_via_partition_root_given,
890  &publish_via_partition_root);
891 
892  pubform = (Form_pg_publication) GETSTRUCT(tup);
893 
894  /*
895  * If the publication doesn't publish changes via the root partitioned
896  * table, the partition's row filter and column list will be used. So
897  * disallow using WHERE clause and column lists on partitioned table in
898  * this case.
899  */
900  if (!pubform->puballtables && publish_via_partition_root_given &&
901  !publish_via_partition_root)
902  {
903  /*
904  * Lock the publication so nobody else can do anything with it. This
905  * prevents concurrent alter to add partitioned table(s) with WHERE
906  * clause(s) and/or column lists which we don't allow when not
907  * publishing via root.
908  */
909  LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
911 
912  root_relids = GetPublicationRelations(pubform->oid,
914 
915  foreach(lc, root_relids)
916  {
917  Oid relid = lfirst_oid(lc);
918  HeapTuple rftuple;
919  char relkind;
920  char *relname;
921  bool has_rowfilter;
922  bool has_collist;
923 
924  /*
925  * Beware: we don't have lock on the relations, so cope silently
926  * with the cache lookups returning NULL.
927  */
928 
929  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
930  ObjectIdGetDatum(relid),
931  ObjectIdGetDatum(pubform->oid));
932  if (!HeapTupleIsValid(rftuple))
933  continue;
934  has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
935  has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
936  if (!has_rowfilter && !has_collist)
937  {
938  ReleaseSysCache(rftuple);
939  continue;
940  }
941 
942  relkind = get_rel_relkind(relid);
943  if (relkind != RELKIND_PARTITIONED_TABLE)
944  {
945  ReleaseSysCache(rftuple);
946  continue;
947  }
948  relname = get_rel_name(relid);
949  if (relname == NULL) /* table concurrently dropped */
950  {
951  ReleaseSysCache(rftuple);
952  continue;
953  }
954 
955  if (has_rowfilter)
956  ereport(ERROR,
957  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
959  "publish_via_partition_root",
960  stmt->pubname),
961  errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
962  relname, "publish_via_partition_root")));
963  Assert(has_collist);
964  ereport(ERROR,
965  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
966  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
967  "publish_via_partition_root",
968  stmt->pubname),
969  errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
970  relname, "publish_via_partition_root")));
971  }
972  }
973 
974  /* Everything ok, form a new tuple. */
975  memset(values, 0, sizeof(values));
976  memset(nulls, false, sizeof(nulls));
977  memset(replaces, false, sizeof(replaces));
978 
979  if (publish_given)
980  {
981  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
982  replaces[Anum_pg_publication_pubinsert - 1] = true;
983 
984  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
985  replaces[Anum_pg_publication_pubupdate - 1] = true;
986 
987  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
988  replaces[Anum_pg_publication_pubdelete - 1] = true;
989 
990  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
991  replaces[Anum_pg_publication_pubtruncate - 1] = true;
992  }
993 
994  if (publish_via_partition_root_given)
995  {
996  values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
997  replaces[Anum_pg_publication_pubviaroot - 1] = true;
998  }
999 
1000  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1001  replaces);
1002 
1003  /* Update the catalog. */
1004  CatalogTupleUpdate(rel, &tup->t_self, tup);
1005 
1007 
1008  pubform = (Form_pg_publication) GETSTRUCT(tup);
1009 
1010  /* Invalidate the relcache. */
1011  if (pubform->puballtables)
1012  {
1014  }
1015  else
1016  {
1017  List *relids = NIL;
1018  List *schemarelids = NIL;
1019 
1020  /*
1021  * For any partitioned tables contained in the publication, we must
1022  * invalidate all partitions contained in the respective partition
1023  * trees, not just those explicitly mentioned in the publication.
1024  */
1025  if (root_relids == NIL)
1026  relids = GetPublicationRelations(pubform->oid,
1028  else
1029  {
1030  /*
1031  * We already got tables explicitly mentioned in the publication.
1032  * Now get all partitions for the partitioned table in the list.
1033  */
1034  foreach(lc, root_relids)
1035  relids = GetPubPartitionOptionRelations(relids,
1037  lfirst_oid(lc));
1038  }
1039 
1040  schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1042  relids = list_concat_unique_oid(relids, schemarelids);
1043 
1044  InvalidatePublicationRels(relids);
1045  }
1046 
1047  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1049  (Node *) stmt);
1050 
1051  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1052 }
1053 
1054 /*
1055  * Invalidate the relations.
1056  */
1057 void
1059 {
1060  /*
1061  * We don't want to send too many individual messages, at some point it's
1062  * cheaper to just reset whole relcache.
1063  */
1064  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1065  {
1066  ListCell *lc;
1067 
1068  foreach(lc, relids)
1070  }
1071  else
1073 }
1074 
1075 /*
1076  * Add or remove table to/from publication.
1077  */
1078 static void
1080  List *tables, const char *queryString,
1081  bool publish_schema)
1082 {
1083  List *rels = NIL;
1085  Oid pubid = pubform->oid;
1086 
1087  /*
1088  * Nothing to do if no objects, except in SET: for that it is quite
1089  * possible that user has not specified any tables in which case we need
1090  * to remove all the existing tables.
1091  */
1092  if (!tables && stmt->action != AP_SetObjects)
1093  return;
1094 
1095  rels = OpenTableList(tables);
1096 
1097  if (stmt->action == AP_AddObjects)
1098  {
1099  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1100 
1101  publish_schema |= is_schema_publication(pubid);
1102 
1103  CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1104  pubform->pubviaroot);
1105 
1106  PublicationAddTables(pubid, rels, false, stmt);
1107  }
1108  else if (stmt->action == AP_DropObjects)
1109  PublicationDropTables(pubid, rels, false);
1110  else /* AP_SetObjects */
1111  {
1112  List *oldrelids = GetPublicationRelations(pubid,
1114  List *delrels = NIL;
1115  ListCell *oldlc;
1116 
1117  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1118 
1119  CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1120  pubform->pubviaroot);
1121 
1122  /*
1123  * To recreate the relation list for the publication, look for
1124  * existing relations that do not need to be dropped.
1125  */
1126  foreach(oldlc, oldrelids)
1127  {
1128  Oid oldrelid = lfirst_oid(oldlc);
1129  ListCell *newlc;
1130  PublicationRelInfo *oldrel;
1131  bool found = false;
1132  HeapTuple rftuple;
1133  Node *oldrelwhereclause = NULL;
1134  Bitmapset *oldcolumns = NULL;
1135 
1136  /* look up the cache for the old relmap */
1137  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1138  ObjectIdGetDatum(oldrelid),
1139  ObjectIdGetDatum(pubid));
1140 
1141  /*
1142  * See if the existing relation currently has a WHERE clause or a
1143  * column list. We need to compare those too.
1144  */
1145  if (HeapTupleIsValid(rftuple))
1146  {
1147  bool isnull = true;
1148  Datum whereClauseDatum;
1149  Datum columnListDatum;
1150 
1151  /* Load the WHERE clause for this table. */
1152  whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1153  Anum_pg_publication_rel_prqual,
1154  &isnull);
1155  if (!isnull)
1156  oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1157 
1158  /* Transform the int2vector column list to a bitmap. */
1159  columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1160  Anum_pg_publication_rel_prattrs,
1161  &isnull);
1162 
1163  if (!isnull)
1164  oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1165 
1166  ReleaseSysCache(rftuple);
1167  }
1168 
1169  foreach(newlc, rels)
1170  {
1171  PublicationRelInfo *newpubrel;
1172  Oid newrelid;
1173  Bitmapset *newcolumns = NULL;
1174 
1175  newpubrel = (PublicationRelInfo *) lfirst(newlc);
1176  newrelid = RelationGetRelid(newpubrel->relation);
1177 
1178  /*
1179  * If the new publication has column list, transform it to a
1180  * bitmap too.
1181  */
1182  if (newpubrel->columns)
1183  {
1184  ListCell *lc;
1185 
1186  foreach(lc, newpubrel->columns)
1187  {
1188  char *colname = strVal(lfirst(lc));
1189  AttrNumber attnum = get_attnum(newrelid, colname);
1190 
1191  newcolumns = bms_add_member(newcolumns, attnum);
1192  }
1193  }
1194 
1195  /*
1196  * Check if any of the new set of relations matches with the
1197  * existing relations in the publication. Additionally, if the
1198  * relation has an associated WHERE clause, check the WHERE
1199  * expressions also match. Same for the column list. Drop the
1200  * rest.
1201  */
1202  if (RelationGetRelid(newpubrel->relation) == oldrelid)
1203  {
1204  if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1205  bms_equal(oldcolumns, newcolumns))
1206  {
1207  found = true;
1208  break;
1209  }
1210  }
1211  }
1212 
1213  /*
1214  * Add the non-matched relations to a list so that they can be
1215  * dropped.
1216  */
1217  if (!found)
1218  {
1219  oldrel = palloc(sizeof(PublicationRelInfo));
1220  oldrel->whereClause = NULL;
1221  oldrel->columns = NIL;
1222  oldrel->relation = table_open(oldrelid,
1224  delrels = lappend(delrels, oldrel);
1225  }
1226  }
1227 
1228  /* And drop them. */
1229  PublicationDropTables(pubid, delrels, true);
1230 
1231  /*
1232  * Don't bother calculating the difference for adding, we'll catch and
1233  * skip existing ones when doing catalog update.
1234  */
1235  PublicationAddTables(pubid, rels, true, stmt);
1236 
1237  CloseTableList(delrels);
1238  }
1239 
1240  CloseTableList(rels);
1241 }
1242 
1243 /*
1244  * Alter the publication schemas.
1245  *
1246  * Add or remove schemas to/from publication.
1247  */
1248 static void
1250  HeapTuple tup, List *schemaidlist)
1251 {
1253 
1254  /*
1255  * Nothing to do if no objects, except in SET: for that it is quite
1256  * possible that user has not specified any schemas in which case we need
1257  * to remove all the existing schemas.
1258  */
1259  if (!schemaidlist && stmt->action != AP_SetObjects)
1260  return;
1261 
1262  /*
1263  * Schema lock is held until the publication is altered to prevent
1264  * concurrent schema deletion.
1265  */
1266  LockSchemaList(schemaidlist);
1267  if (stmt->action == AP_AddObjects)
1268  {
1269  ListCell *lc;
1270  List *reloids;
1271 
1272  reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1273 
1274  foreach(lc, reloids)
1275  {
1276  HeapTuple coltuple;
1277 
1278  coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1280  ObjectIdGetDatum(pubform->oid));
1281 
1282  if (!HeapTupleIsValid(coltuple))
1283  continue;
1284 
1285  /*
1286  * Disallow adding schema if column list is already part of the
1287  * publication. See CheckPubRelationColumnList.
1288  */
1289  if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1290  ereport(ERROR,
1291  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1292  errmsg("cannot add schema to publication \"%s\"",
1293  stmt->pubname),
1294  errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1295 
1296  ReleaseSysCache(coltuple);
1297  }
1298 
1299  PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1300  }
1301  else if (stmt->action == AP_DropObjects)
1302  PublicationDropSchemas(pubform->oid, schemaidlist, false);
1303  else /* AP_SetObjects */
1304  {
1305  List *oldschemaids = GetPublicationSchemas(pubform->oid);
1306  List *delschemas = NIL;
1307 
1308  /* Identify which schemas should be dropped */
1309  delschemas = list_difference_oid(oldschemaids, schemaidlist);
1310 
1311  /*
1312  * Schema lock is held until the publication is altered to prevent
1313  * concurrent schema deletion.
1314  */
1315  LockSchemaList(delschemas);
1316 
1317  /* And drop them */
1318  PublicationDropSchemas(pubform->oid, delschemas, true);
1319 
1320  /*
1321  * Don't bother calculating the difference for adding, we'll catch and
1322  * skip existing ones when doing catalog update.
1323  */
1324  PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1325  }
1326 }
1327 
1328 /*
1329  * Check if relations and schemas can be in a given publication and throw
1330  * appropriate error if not.
1331  */
1332 static void
1334  List *tables, List *schemaidlist)
1335 {
1337 
1338  if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1339  schemaidlist && !superuser())
1340  ereport(ERROR,
1341  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1342  errmsg("must be superuser to add or set schemas")));
1343 
1344  /*
1345  * Check that user is allowed to manipulate the publication tables in
1346  * schema
1347  */
1348  if (schemaidlist && pubform->puballtables)
1349  ereport(ERROR,
1350  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1351  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1352  NameStr(pubform->pubname)),
1353  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1354 
1355  /* Check that user is allowed to manipulate the publication tables. */
1356  if (tables && pubform->puballtables)
1357  ereport(ERROR,
1358  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1359  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1360  NameStr(pubform->pubname)),
1361  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1362 }
1363 
1364 /*
1365  * Alter the existing publication.
1366  *
1367  * This is dispatcher function for AlterPublicationOptions,
1368  * AlterPublicationSchemas and AlterPublicationTables.
1369  */
1370 void
1372 {
1373  Relation rel;
1374  HeapTuple tup;
1375  Form_pg_publication pubform;
1376 
1377  rel = table_open(PublicationRelationId, RowExclusiveLock);
1378 
1379  tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1380  CStringGetDatum(stmt->pubname));
1381 
1382  if (!HeapTupleIsValid(tup))
1383  ereport(ERROR,
1384  (errcode(ERRCODE_UNDEFINED_OBJECT),
1385  errmsg("publication \"%s\" does not exist",
1386  stmt->pubname)));
1387 
1388  pubform = (Form_pg_publication) GETSTRUCT(tup);
1389 
1390  /* must be owner */
1391  if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1393  stmt->pubname);
1394 
1395  if (stmt->options)
1396  AlterPublicationOptions(pstate, stmt, rel, tup);
1397  else
1398  {
1399  List *relations = NIL;
1400  List *schemaidlist = NIL;
1401  Oid pubid = pubform->oid;
1402 
1403  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1404  &schemaidlist);
1405 
1406  CheckAlterPublication(stmt, tup, relations, schemaidlist);
1407 
1408  heap_freetuple(tup);
1409 
1410  /* Lock the publication so nobody else can do anything with it. */
1411  LockDatabaseObject(PublicationRelationId, pubid, 0,
1413 
1414  /*
1415  * It is possible that by the time we acquire the lock on publication,
1416  * concurrent DDL has removed it. We can test this by checking the
1417  * existence of publication. We get the tuple again to avoid the risk
1418  * of any publication option getting changed.
1419  */
1420  tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1421  if (!HeapTupleIsValid(tup))
1422  ereport(ERROR,
1423  errcode(ERRCODE_UNDEFINED_OBJECT),
1424  errmsg("publication \"%s\" does not exist",
1425  stmt->pubname));
1426 
1427  AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1428  schemaidlist != NIL);
1429  AlterPublicationSchemas(stmt, tup, schemaidlist);
1430  }
1431 
1432  /* Cleanup. */
1433  heap_freetuple(tup);
1435 }
1436 
1437 /*
1438  * Remove relation from publication by mapping OID.
1439  */
1440 void
1442 {
1443  Relation rel;
1444  HeapTuple tup;
1445  Form_pg_publication_rel pubrel;
1446  List *relids = NIL;
1447 
1448  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1449 
1450  tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1451 
1452  if (!HeapTupleIsValid(tup))
1453  elog(ERROR, "cache lookup failed for publication table %u",
1454  proid);
1455 
1456  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1457 
1458  /*
1459  * Invalidate relcache so that publication info is rebuilt.
1460  *
1461  * For the partitioned tables, we must invalidate all partitions contained
1462  * in the respective partition hierarchies, not just the one explicitly
1463  * mentioned in the publication. This is required because we implicitly
1464  * publish the child tables when the parent table is published.
1465  */
1467  pubrel->prrelid);
1468 
1469  InvalidatePublicationRels(relids);
1470 
1471  CatalogTupleDelete(rel, &tup->t_self);
1472 
1473  ReleaseSysCache(tup);
1474 
1476 }
1477 
1478 /*
1479  * Remove the publication by mapping OID.
1480  */
1481 void
1483 {
1484  Relation rel;
1485  HeapTuple tup;
1486  Form_pg_publication pubform;
1487 
1488  rel = table_open(PublicationRelationId, RowExclusiveLock);
1489 
1490  tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1491  if (!HeapTupleIsValid(tup))
1492  elog(ERROR, "cache lookup failed for publication %u", pubid);
1493 
1494  pubform = (Form_pg_publication) GETSTRUCT(tup);
1495 
1496  /* Invalidate relcache so that publication info is rebuilt. */
1497  if (pubform->puballtables)
1499 
1500  CatalogTupleDelete(rel, &tup->t_self);
1501 
1502  ReleaseSysCache(tup);
1503 
1505 }
1506 
1507 /*
1508  * Remove schema from publication by mapping OID.
1509  */
1510 void
1512 {
1513  Relation rel;
1514  HeapTuple tup;
1515  List *schemaRels = NIL;
1517 
1518  rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1519 
1520  tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1521 
1522  if (!HeapTupleIsValid(tup))
1523  elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1524 
1525  pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1526 
1527  /*
1528  * Invalidate relcache so that publication info is rebuilt. See
1529  * RemovePublicationRelById for why we need to consider all the
1530  * partitions.
1531  */
1532  schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1534  InvalidatePublicationRels(schemaRels);
1535 
1536  CatalogTupleDelete(rel, &tup->t_self);
1537 
1538  ReleaseSysCache(tup);
1539 
1541 }
1542 
1543 /*
1544  * Open relations specified by a PublicationTable list.
1545  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1546  * add them to a publication.
1547  */
1548 static List *
1550 {
1551  List *relids = NIL;
1552  List *rels = NIL;
1553  ListCell *lc;
1554  List *relids_with_rf = NIL;
1555  List *relids_with_collist = NIL;
1556 
1557  /*
1558  * Open, share-lock, and check all the explicitly-specified relations
1559  */
1560  foreach(lc, tables)
1561  {
1563  bool recurse = t->relation->inh;
1564  Relation rel;
1565  Oid myrelid;
1566  PublicationRelInfo *pub_rel;
1567 
1568  /* Allow query cancel in case this takes a long time */
1570 
1572  myrelid = RelationGetRelid(rel);
1573 
1574  /*
1575  * Filter out duplicates if user specifies "foo, foo".
1576  *
1577  * Note that this algorithm is known to not be very efficient (O(N^2))
1578  * but given that it only works on list of tables given to us by user
1579  * it's deemed acceptable.
1580  */
1581  if (list_member_oid(relids, myrelid))
1582  {
1583  /* Disallow duplicate tables if there are any with row filters. */
1584  if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1585  ereport(ERROR,
1587  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1588  RelationGetRelationName(rel))));
1589 
1590  /* Disallow duplicate tables if there are any with column lists. */
1591  if (t->columns || list_member_oid(relids_with_collist, myrelid))
1592  ereport(ERROR,
1594  errmsg("conflicting or redundant column lists for table \"%s\"",
1595  RelationGetRelationName(rel))));
1596 
1598  continue;
1599  }
1600 
1601  pub_rel = palloc(sizeof(PublicationRelInfo));
1602  pub_rel->relation = rel;
1603  pub_rel->whereClause = t->whereClause;
1604  pub_rel->columns = t->columns;
1605  rels = lappend(rels, pub_rel);
1606  relids = lappend_oid(relids, myrelid);
1607 
1608  if (t->whereClause)
1609  relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1610 
1611  if (t->columns)
1612  relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1613 
1614  /*
1615  * Add children of this rel, if requested, so that they too are added
1616  * to the publication. A partitioned table can't have any inheritance
1617  * children other than its partitions, which need not be explicitly
1618  * added to the publication.
1619  */
1620  if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1621  {
1622  List *children;
1623  ListCell *child;
1624 
1625  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1626  NULL);
1627 
1628  foreach(child, children)
1629  {
1630  Oid childrelid = lfirst_oid(child);
1631 
1632  /* Allow query cancel in case this takes a long time */
1634 
1635  /*
1636  * Skip duplicates if user specified both parent and child
1637  * tables.
1638  */
1639  if (list_member_oid(relids, childrelid))
1640  {
1641  /*
1642  * We don't allow to specify row filter for both parent
1643  * and child table at the same time as it is not very
1644  * clear which one should be given preference.
1645  */
1646  if (childrelid != myrelid &&
1647  (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1648  ereport(ERROR,
1650  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1651  RelationGetRelationName(rel))));
1652 
1653  /*
1654  * We don't allow to specify column list for both parent
1655  * and child table at the same time as it is not very
1656  * clear which one should be given preference.
1657  */
1658  if (childrelid != myrelid &&
1659  (t->columns || list_member_oid(relids_with_collist, childrelid)))
1660  ereport(ERROR,
1662  errmsg("conflicting or redundant column lists for table \"%s\"",
1663  RelationGetRelationName(rel))));
1664 
1665  continue;
1666  }
1667 
1668  /* find_all_inheritors already got lock */
1669  rel = table_open(childrelid, NoLock);
1670  pub_rel = palloc(sizeof(PublicationRelInfo));
1671  pub_rel->relation = rel;
1672  /* child inherits WHERE clause from parent */
1673  pub_rel->whereClause = t->whereClause;
1674 
1675  /* child inherits column list from parent */
1676  pub_rel->columns = t->columns;
1677  rels = lappend(rels, pub_rel);
1678  relids = lappend_oid(relids, childrelid);
1679 
1680  if (t->whereClause)
1681  relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1682 
1683  if (t->columns)
1684  relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1685  }
1686  }
1687  }
1688 
1689  list_free(relids);
1690  list_free(relids_with_rf);
1691 
1692  return rels;
1693 }
1694 
1695 /*
1696  * Close all relations in the list.
1697  */
1698 static void
1700 {
1701  ListCell *lc;
1702 
1703  foreach(lc, rels)
1704  {
1705  PublicationRelInfo *pub_rel;
1706 
1707  pub_rel = (PublicationRelInfo *) lfirst(lc);
1708  table_close(pub_rel->relation, NoLock);
1709  }
1710 
1711  list_free_deep(rels);
1712 }
1713 
1714 /*
1715  * Lock the schemas specified in the schema list in AccessShareLock mode in
1716  * order to prevent concurrent schema deletion.
1717  */
1718 static void
1719 LockSchemaList(List *schemalist)
1720 {
1721  ListCell *lc;
1722 
1723  foreach(lc, schemalist)
1724  {
1725  Oid schemaid = lfirst_oid(lc);
1726 
1727  /* Allow query cancel in case this takes a long time */
1729  LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1730 
1731  /*
1732  * It is possible that by the time we acquire the lock on schema,
1733  * concurrent DDL has removed it. We can test this by checking the
1734  * existence of schema.
1735  */
1736  if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1737  ereport(ERROR,
1738  errcode(ERRCODE_UNDEFINED_SCHEMA),
1739  errmsg("schema with OID %u does not exist", schemaid));
1740  }
1741 }
1742 
1743 /*
1744  * Add listed tables to the publication.
1745  */
1746 static void
1747 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1749 {
1750  ListCell *lc;
1751 
1752  Assert(!stmt || !stmt->for_all_tables);
1753 
1754  foreach(lc, rels)
1755  {
1756  PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1757  Relation rel = pub_rel->relation;
1758  ObjectAddress obj;
1759 
1760  /* Must be owner of the table or superuser. */
1761  if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1764 
1765  obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1766  if (stmt)
1767  {
1769  (Node *) stmt);
1770 
1771  InvokeObjectPostCreateHook(PublicationRelRelationId,
1772  obj.objectId, 0);
1773  }
1774  }
1775 }
1776 
1777 /*
1778  * Remove listed tables from the publication.
1779  */
1780 static void
1781 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1782 {
1783  ObjectAddress obj;
1784  ListCell *lc;
1785  Oid prid;
1786 
1787  foreach(lc, rels)
1788  {
1789  PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1790  Relation rel = pubrel->relation;
1791  Oid relid = RelationGetRelid(rel);
1792 
1793  if (pubrel->columns)
1794  ereport(ERROR,
1795  errcode(ERRCODE_SYNTAX_ERROR),
1796  errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1797 
1798  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1799  ObjectIdGetDatum(relid),
1800  ObjectIdGetDatum(pubid));
1801  if (!OidIsValid(prid))
1802  {
1803  if (missing_ok)
1804  continue;
1805 
1806  ereport(ERROR,
1807  (errcode(ERRCODE_UNDEFINED_OBJECT),
1808  errmsg("relation \"%s\" is not part of the publication",
1809  RelationGetRelationName(rel))));
1810  }
1811 
1812  if (pubrel->whereClause)
1813  ereport(ERROR,
1814  (errcode(ERRCODE_SYNTAX_ERROR),
1815  errmsg("cannot use a WHERE clause when removing a table from a publication")));
1816 
1817  ObjectAddressSet(obj, PublicationRelRelationId, prid);
1818  performDeletion(&obj, DROP_CASCADE, 0);
1819  }
1820 }
1821 
1822 /*
1823  * Add listed schemas to the publication.
1824  */
1825 static void
1826 PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1828 {
1829  ListCell *lc;
1830 
1831  Assert(!stmt || !stmt->for_all_tables);
1832 
1833  foreach(lc, schemas)
1834  {
1835  Oid schemaid = lfirst_oid(lc);
1836  ObjectAddress obj;
1837 
1838  obj = publication_add_schema(pubid, schemaid, if_not_exists);
1839  if (stmt)
1840  {
1842  (Node *) stmt);
1843 
1844  InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1845  obj.objectId, 0);
1846  }
1847  }
1848 }
1849 
1850 /*
1851  * Remove listed schemas from the publication.
1852  */
1853 static void
1854 PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1855 {
1856  ObjectAddress obj;
1857  ListCell *lc;
1858  Oid psid;
1859 
1860  foreach(lc, schemas)
1861  {
1862  Oid schemaid = lfirst_oid(lc);
1863 
1864  psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1865  Anum_pg_publication_namespace_oid,
1866  ObjectIdGetDatum(schemaid),
1867  ObjectIdGetDatum(pubid));
1868  if (!OidIsValid(psid))
1869  {
1870  if (missing_ok)
1871  continue;
1872 
1873  ereport(ERROR,
1874  (errcode(ERRCODE_UNDEFINED_OBJECT),
1875  errmsg("tables from schema \"%s\" are not part of the publication",
1876  get_namespace_name(schemaid))));
1877  }
1878 
1879  ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1880  performDeletion(&obj, DROP_CASCADE, 0);
1881  }
1882 }
1883 
1884 /*
1885  * Internal workhorse for changing a publication owner
1886  */
1887 static void
1889 {
1890  Form_pg_publication form;
1891 
1892  form = (Form_pg_publication) GETSTRUCT(tup);
1893 
1894  if (form->pubowner == newOwnerId)
1895  return;
1896 
1897  if (!superuser())
1898  {
1899  AclResult aclresult;
1900 
1901  /* Must be owner */
1902  if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1904  NameStr(form->pubname));
1905 
1906  /* Must be able to become new owner */
1907  check_can_set_role(GetUserId(), newOwnerId);
1908 
1909  /* New owner must have CREATE privilege on database */
1910  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1911  if (aclresult != ACLCHECK_OK)
1912  aclcheck_error(aclresult, OBJECT_DATABASE,
1914 
1915  if (form->puballtables && !superuser_arg(newOwnerId))
1916  ereport(ERROR,
1917  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1918  errmsg("permission denied to change owner of publication \"%s\"",
1919  NameStr(form->pubname)),
1920  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1921 
1922  if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1923  ereport(ERROR,
1924  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1925  errmsg("permission denied to change owner of publication \"%s\"",
1926  NameStr(form->pubname)),
1927  errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1928  }
1929 
1930  form->pubowner = newOwnerId;
1931  CatalogTupleUpdate(rel, &tup->t_self, tup);
1932 
1933  /* Update owner dependency reference */
1934  changeDependencyOnOwner(PublicationRelationId,
1935  form->oid,
1936  newOwnerId);
1937 
1938  InvokeObjectPostAlterHook(PublicationRelationId,
1939  form->oid, 0);
1940 }
1941 
1942 /*
1943  * Change publication owner -- by name
1944  */
1946 AlterPublicationOwner(const char *name, Oid newOwnerId)
1947 {
1948  Oid subid;
1949  HeapTuple tup;
1950  Relation rel;
1951  ObjectAddress address;
1952  Form_pg_publication pubform;
1953 
1954  rel = table_open(PublicationRelationId, RowExclusiveLock);
1955 
1956  tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
1957 
1958  if (!HeapTupleIsValid(tup))
1959  ereport(ERROR,
1960  (errcode(ERRCODE_UNDEFINED_OBJECT),
1961  errmsg("publication \"%s\" does not exist", name)));
1962 
1963  pubform = (Form_pg_publication) GETSTRUCT(tup);
1964  subid = pubform->oid;
1965 
1966  AlterPublicationOwner_internal(rel, tup, newOwnerId);
1967 
1968  ObjectAddressSet(address, PublicationRelationId, subid);
1969 
1970  heap_freetuple(tup);
1971 
1973 
1974  return address;
1975 }
1976 
1977 /*
1978  * Change publication owner -- by OID
1979  */
1980 void
1982 {
1983  HeapTuple tup;
1984  Relation rel;
1985 
1986  rel = table_open(PublicationRelationId, RowExclusiveLock);
1987 
1988  tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
1989 
1990  if (!HeapTupleIsValid(tup))
1991  ereport(ERROR,
1992  (errcode(ERRCODE_UNDEFINED_OBJECT),
1993  errmsg("publication with OID %u does not exist", subid)));
1994 
1995  AlterPublicationOwner_internal(rel, tup, newOwnerId);
1996 
1997  heap_freetuple(tup);
1998 
2000 }
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5185
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2688
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3876
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4130
int16 AttrNumber
Definition: attnum.h:21
#define InvalidAttrNumber
Definition: attnum.h:23
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1306
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:815
static Datum values[MAXATTR]
Definition: bootstrap.c:152
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:746
#define Assert(condition)
Definition: c.h:858
#define OidIsValid(objectId)
Definition: c.h:775
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:391
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3153
bool defGetBoolean(DefElem *def)
Definition: define.c:107
char * defGetString(DefElem *def)
Definition: define.c:48
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:384
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:273
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1232
int errdetail(const char *fmt,...)
Definition: elog.c:1205
int errhint(const char *fmt,...)
Definition: elog.c:1319
int errcode(int sqlerrcode)
Definition: elog.c:859
int errmsg(const char *fmt,...)
Definition: elog.c:1072
#define _(x)
Definition: elog.c:90
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:642
Oid MyDatabaseId
Definition: globals.c:91
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition: heaptuple.c:455
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1419
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1384
int x
Definition: isn.c:71
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
List * list_difference_oid(const List *list1, const List *list2)
Definition: list.c:1313
List * list_concat_unique_oid(List *list1, const List *list2)
Definition: list.c:1469
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
void list_free_deep(List *list)
Definition: list.c:1560
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1004
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
AttrNumber get_attnum(Oid relid, const char *attname)
Definition: lsyscache.c:858
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3344
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1981
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1906
char func_volatile(Oid funcid)
Definition: lsyscache.c:1758
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition: lsyscache.c:827
void * palloc(Size size)
Definition: mcxt.c:1316
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
Oid GetUserId(void)
Definition: miscinit.c:514
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
List * fetch_search_path(bool includeImplicit)
Definition: namespace.c:4795
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Definition: namespace.c:3520
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:42
Oid exprInputCollation(const Node *expr)
Definition: nodeFuncs.c:1071
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Definition: nodeFuncs.c:1906
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:816
int exprLocation(const Node *expr)
Definition: nodeFuncs.c:1386
#define expression_tree_walker(n, w, c)
Definition: nodeFuncs.h:151
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define copyObject(obj)
Definition: nodes.h:224
#define nodeTag(nodeptr)
Definition: nodes.h:133
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
Definition: parse_node.c:72
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:106
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
@ EXPR_KIND_WHERE
Definition: parse_node.h:46
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
Definition: parsenodes.h:4151
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
Definition: parsenodes.h:4150
@ PUBLICATIONOBJ_TABLE
Definition: parsenodes.h:4149
@ AP_DropObjects
Definition: parsenodes.h:4177
@ AP_SetObjects
Definition: parsenodes.h:4178
@ AP_AddObjects
Definition: parsenodes.h:4176
@ DROP_CASCADE
Definition: parsenodes.h:2335
@ OBJECT_DATABASE
Definition: parsenodes.h:2270
@ OBJECT_PUBLICATION
Definition: parsenodes.h:2291
#define ACL_CREATE
Definition: parsenodes.h:85
int16 attnum
Definition: pg_attribute.h:74
NameData relname
Definition: pg_class.h:38
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_node(type, lc)
Definition: pg_list.h:176
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define linitial_oid(l)
Definition: pg_list.h:180
#define lfirst_oid(lc)
Definition: pg_list.h:174
bool is_schema_publication(Oid pubid)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:308
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:160
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
unsigned int Oid
Definition: postgres_ext.h:31
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
tree context
Definition: radixtree.h:1829
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:5231
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition: relcache.h:63
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
char * defname
Definition: parsenodes.h:815
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
Definition: nodes.h:129
const char * p_sourcetext
Definition: parse_node.h:193
PublicationObjSpecType pubobjtype
Definition: parsenodes.h:4159
PublicationTable * pubtable
Definition: parsenodes.h:4161
RangeVar * relation
Definition: parsenodes.h:4139
bool inh
Definition: primnodes.h:85
Form_pg_class rd_rel
Definition: rel.h:111
Definition: primnodes.h:248
AttrNumber varattno
Definition: primnodes.h:260
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
bool superuser(void)
Definition: superuser.c:46
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:266
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:218
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:479
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:229
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:86
#define SearchSysCacheExists1(cacheId, key1)
Definition: syscache.h:95
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:104
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:106
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:83
#define FirstNormalObjectId
Definition: transam.h:197
#define strVal(v)
Definition: value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3457
const char * name
void CommandCounterIncrement(void)
Definition: xact.c:1097
int wal_level
Definition: xlog.c:131
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:74