PostgreSQL Source Code  git master
publicationcmds.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * publicationcmds.c
4  * publication manipulation
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/commands/publicationcmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/indexing.h"
22 #include "catalog/namespace.h"
23 #include "catalog/objectaccess.h"
24 #include "catalog/objectaddress.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_inherits.h"
27 #include "catalog/pg_namespace.h"
28 #include "catalog/pg_proc.h"
29 #include "catalog/pg_publication.h"
32 #include "commands/dbcommands.h"
33 #include "commands/defrem.h"
34 #include "commands/event_trigger.h"
36 #include "miscadmin.h"
37 #include "nodes/nodeFuncs.h"
38 #include "parser/parse_clause.h"
39 #include "parser/parse_collate.h"
40 #include "parser/parse_relation.h"
41 #include "storage/lmgr.h"
42 #include "utils/acl.h"
43 #include "utils/builtins.h"
44 #include "utils/inval.h"
45 #include "utils/lsyscache.h"
46 #include "utils/rel.h"
47 #include "utils/syscache.h"
48 #include "utils/varlena.h"
49 
50 
51 /*
52  * Information used to validate the columns in the row filter expression. See
53  * contain_invalid_rfcolumn_walker for details.
54  */
55 typedef struct rf_context
56 {
57  Bitmapset *bms_replident; /* bitset of replica identity columns */
58  bool pubviaroot; /* true if we are validating the parent
59  * relation's row filter */
60  Oid relid; /* relid of the relation */
61  Oid parentid; /* relid of the parent relation */
63 
64 static List *OpenTableList(List *tables);
65 static void CloseTableList(List *rels);
66 static void LockSchemaList(List *schemalist);
67 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
69 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
72 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 
74 
75 static void
77  List *options,
78  bool *publish_given,
79  PublicationActions *pubactions,
80  bool *publish_via_partition_root_given,
81  bool *publish_via_partition_root,
82  bool *publish_generated_columns_given,
83  bool *publish_generated_columns)
84 {
85  ListCell *lc;
86 
87  *publish_given = false;
88  *publish_via_partition_root_given = false;
89  *publish_generated_columns_given = false;
90 
91  /* defaults */
92  pubactions->pubinsert = true;
93  pubactions->pubupdate = true;
94  pubactions->pubdelete = true;
95  pubactions->pubtruncate = true;
96  *publish_via_partition_root = false;
97  *publish_generated_columns = false;
98 
99  /* Parse options */
100  foreach(lc, options)
101  {
102  DefElem *defel = (DefElem *) lfirst(lc);
103 
104  if (strcmp(defel->defname, "publish") == 0)
105  {
106  char *publish;
107  List *publish_list;
108  ListCell *lc2;
109 
110  if (*publish_given)
111  errorConflictingDefElem(defel, pstate);
112 
113  /*
114  * If publish option was given only the explicitly listed actions
115  * should be published.
116  */
117  pubactions->pubinsert = false;
118  pubactions->pubupdate = false;
119  pubactions->pubdelete = false;
120  pubactions->pubtruncate = false;
121 
122  *publish_given = true;
123  publish = defGetString(defel);
124 
125  if (!SplitIdentifierString(publish, ',', &publish_list))
126  ereport(ERROR,
127  (errcode(ERRCODE_SYNTAX_ERROR),
128  errmsg("invalid list syntax in parameter \"%s\"",
129  "publish")));
130 
131  /* Process the option list. */
132  foreach(lc2, publish_list)
133  {
134  char *publish_opt = (char *) lfirst(lc2);
135 
136  if (strcmp(publish_opt, "insert") == 0)
137  pubactions->pubinsert = true;
138  else if (strcmp(publish_opt, "update") == 0)
139  pubactions->pubupdate = true;
140  else if (strcmp(publish_opt, "delete") == 0)
141  pubactions->pubdelete = true;
142  else if (strcmp(publish_opt, "truncate") == 0)
143  pubactions->pubtruncate = true;
144  else
145  ereport(ERROR,
146  (errcode(ERRCODE_SYNTAX_ERROR),
147  errmsg("unrecognized value for publication option \"%s\": \"%s\"",
148  "publish", publish_opt)));
149  }
150  }
151  else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
152  {
153  if (*publish_via_partition_root_given)
154  errorConflictingDefElem(defel, pstate);
155  *publish_via_partition_root_given = true;
156  *publish_via_partition_root = defGetBoolean(defel);
157  }
158  else if (strcmp(defel->defname, "publish_generated_columns") == 0)
159  {
160  if (*publish_generated_columns_given)
161  errorConflictingDefElem(defel, pstate);
162  *publish_generated_columns_given = true;
163  *publish_generated_columns = defGetBoolean(defel);
164  }
165  else
166  ereport(ERROR,
167  (errcode(ERRCODE_SYNTAX_ERROR),
168  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
169  }
170 }
171 
172 /*
173  * Convert the PublicationObjSpecType list into schema oid list and
174  * PublicationTable list.
175  */
176 static void
177 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
178  List **rels, List **schemas)
179 {
180  ListCell *cell;
181  PublicationObjSpec *pubobj;
182 
183  if (!pubobjspec_list)
184  return;
185 
186  foreach(cell, pubobjspec_list)
187  {
188  Oid schemaid;
189  List *search_path;
190 
191  pubobj = (PublicationObjSpec *) lfirst(cell);
192 
193  switch (pubobj->pubobjtype)
194  {
196  *rels = lappend(*rels, pubobj->pubtable);
197  break;
199  schemaid = get_namespace_oid(pubobj->name, false);
200 
201  /* Filter out duplicates if user specifies "sch1, sch1" */
202  *schemas = list_append_unique_oid(*schemas, schemaid);
203  break;
205  search_path = fetch_search_path(false);
206  if (search_path == NIL) /* nothing valid in search_path? */
207  ereport(ERROR,
208  errcode(ERRCODE_UNDEFINED_SCHEMA),
209  errmsg("no schema has been selected for CURRENT_SCHEMA"));
210 
211  schemaid = linitial_oid(search_path);
212  list_free(search_path);
213 
214  /* Filter out duplicates if user specifies "sch1, sch1" */
215  *schemas = list_append_unique_oid(*schemas, schemaid);
216  break;
217  default:
218  /* shouldn't happen */
219  elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
220  break;
221  }
222  }
223 }
224 
225 /*
226  * Returns true if any of the columns used in the row filter WHERE expression is
227  * not part of REPLICA IDENTITY, false otherwise.
228  */
229 static bool
231 {
232  if (node == NULL)
233  return false;
234 
235  if (IsA(node, Var))
236  {
237  Var *var = (Var *) node;
238  AttrNumber attnum = var->varattno;
239 
240  /*
241  * If pubviaroot is true, we are validating the row filter of the
242  * parent table, but the bitmap contains the replica identity
243  * information of the child table. So, get the column number of the
244  * child table as parent and child column order could be different.
245  */
246  if (context->pubviaroot)
247  {
248  char *colname = get_attname(context->parentid, attnum, false);
249 
250  attnum = get_attnum(context->relid, colname);
251  }
252 
254  context->bms_replident))
255  return true;
256  }
257 
259  context);
260 }
261 
262 /*
263  * Check if all columns referenced in the filter expression are part of the
264  * REPLICA IDENTITY index or not.
265  *
266  * Returns true if any invalid column is found.
267  */
268 bool
269 pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
270  bool pubviaroot)
271 {
272  HeapTuple rftuple;
273  Oid relid = RelationGetRelid(relation);
274  Oid publish_as_relid = RelationGetRelid(relation);
275  bool result = false;
276  Datum rfdatum;
277  bool rfisnull;
278 
279  /*
280  * FULL means all columns are in the REPLICA IDENTITY, so all columns are
281  * allowed in the row filter and we can skip the validation.
282  */
283  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
284  return false;
285 
286  /*
287  * For a partition, if pubviaroot is true, find the topmost ancestor that
288  * is published via this publication as we need to use its row filter
289  * expression to filter the partition's changes.
290  *
291  * Note that even though the row filter used is for an ancestor, the
292  * REPLICA IDENTITY used will be for the actual child table.
293  */
294  if (pubviaroot && relation->rd_rel->relispartition)
295  {
296  publish_as_relid
297  = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
298 
299  if (!OidIsValid(publish_as_relid))
300  publish_as_relid = relid;
301  }
302 
303  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
304  ObjectIdGetDatum(publish_as_relid),
305  ObjectIdGetDatum(pubid));
306 
307  if (!HeapTupleIsValid(rftuple))
308  return false;
309 
310  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
311  Anum_pg_publication_rel_prqual,
312  &rfisnull);
313 
314  if (!rfisnull)
315  {
316  rf_context context = {0};
317  Node *rfnode;
318  Bitmapset *bms = NULL;
319 
320  context.pubviaroot = pubviaroot;
321  context.parentid = publish_as_relid;
322  context.relid = relid;
323 
324  /* Remember columns that are part of the REPLICA IDENTITY */
325  bms = RelationGetIndexAttrBitmap(relation,
327 
328  context.bms_replident = bms;
329  rfnode = stringToNode(TextDatumGetCString(rfdatum));
330  result = contain_invalid_rfcolumn_walker(rfnode, &context);
331  }
332 
333  ReleaseSysCache(rftuple);
334 
335  return result;
336 }
337 
338 /*
339  * Check for invalid columns in the publication table definition.
340  *
341  * This function evaluates two conditions:
342  *
343  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
344  * by the column list. If any column is missing, *invalid_column_list is set
345  * to true.
346  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
347  * are published either by listing them in the column list or by enabling
348  * publish_generated_columns option. If any unpublished generated column is
349  * found, *invalid_gen_col is set to true.
350  *
351  * Returns true if any of the above conditions are not met.
352  */
353 bool
354 pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
355  bool pubviaroot, bool pubgencols,
356  bool *invalid_column_list,
357  bool *invalid_gen_col)
358 {
359  Oid relid = RelationGetRelid(relation);
360  Oid publish_as_relid = RelationGetRelid(relation);
361  Bitmapset *idattrs;
362  Bitmapset *columns = NULL;
363  TupleDesc desc = RelationGetDescr(relation);
364  Publication *pub;
365  int x;
366 
367  *invalid_column_list = false;
368  *invalid_gen_col = false;
369 
370  /*
371  * For a partition, if pubviaroot is true, find the topmost ancestor that
372  * is published via this publication as we need to use its column list for
373  * the changes.
374  *
375  * Note that even though the column list used is for an ancestor, the
376  * REPLICA IDENTITY used will be for the actual child table.
377  */
378  if (pubviaroot && relation->rd_rel->relispartition)
379  {
380  publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
381 
382  if (!OidIsValid(publish_as_relid))
383  publish_as_relid = relid;
384  }
385 
386  /* Fetch the column list */
387  pub = GetPublication(pubid);
388  check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
389 
390  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
391  {
392  /* With REPLICA IDENTITY FULL, no column list is allowed. */
393  *invalid_column_list = (columns != NULL);
394 
395  /*
396  * As we don't allow a column list with REPLICA IDENTITY FULL, the
397  * publish_generated_columns option must be set to true if the table
398  * has any stored generated columns.
399  */
400  if (!pubgencols &&
401  relation->rd_att->constr &&
402  relation->rd_att->constr->has_generated_stored)
403  *invalid_gen_col = true;
404 
405  if (*invalid_gen_col && *invalid_column_list)
406  return true;
407  }
408 
409  /* Remember columns that are part of the REPLICA IDENTITY */
410  idattrs = RelationGetIndexAttrBitmap(relation,
412 
413  /*
414  * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
415  * (to handle system columns the usual way), while column list does not
416  * use offset, so we can't do bms_is_subset(). Instead, we have to loop
417  * over the idattrs and check all of them are in the list.
418  */
419  x = -1;
420  while ((x = bms_next_member(idattrs, x)) >= 0)
421  {
423  Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
424 
425  if (columns == NULL)
426  {
427  /*
428  * The publish_generated_columns option must be set to true if the
429  * REPLICA IDENTITY contains any stored generated column.
430  */
431  if (!pubgencols && att->attgenerated)
432  {
433  *invalid_gen_col = true;
434  break;
435  }
436 
437  /* Skip validating the column list since it is not defined */
438  continue;
439  }
440 
441  /*
442  * If pubviaroot is true, we are validating the column list of the
443  * parent table, but the bitmap contains the replica identity
444  * information of the child table. The parent/child attnums may not
445  * match, so translate them to the parent - get the attname from the
446  * child, and look it up in the parent.
447  */
448  if (pubviaroot)
449  {
450  /* attribute name in the child table */
451  char *colname = get_attname(relid, attnum, false);
452 
453  /*
454  * Determine the attnum for the attribute name in parent (we are
455  * using the column list defined on the parent).
456  */
457  attnum = get_attnum(publish_as_relid, colname);
458  }
459 
460  /* replica identity column, not covered by the column list */
461  *invalid_column_list |= !bms_is_member(attnum, columns);
462 
463  if (*invalid_column_list && *invalid_gen_col)
464  break;
465  }
466 
467  bms_free(columns);
468  bms_free(idattrs);
469 
470  return *invalid_column_list || *invalid_gen_col;
471 }
472 
473 /* check_functions_in_node callback */
474 static bool
476 {
477  return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
478  func_id >= FirstNormalObjectId);
479 }
480 
481 /*
482  * The row filter walker checks if the row filter expression is a "simple
483  * expression".
484  *
485  * It allows only simple or compound expressions such as:
486  * - (Var Op Const)
487  * - (Var Op Var)
488  * - (Var Op Const) AND/OR (Var Op Const)
489  * - etc
490  * (where Var is a column of the table this filter belongs to)
491  *
492  * The simple expression has the following restrictions:
493  * - User-defined operators are not allowed;
494  * - User-defined functions are not allowed;
495  * - User-defined types are not allowed;
496  * - User-defined collations are not allowed;
497  * - Non-immutable built-in functions are not allowed;
498  * - System columns are not allowed.
499  *
500  * NOTES
501  *
502  * We don't allow user-defined functions/operators/types/collations because
503  * (a) if a user drops a user-defined object used in a row filter expression or
504  * if there is any other error while using it, the logical decoding
505  * infrastructure won't be able to recover from such an error even if the
506  * object is recreated again because a historic snapshot is used to evaluate
507  * the row filter;
508  * (b) a user-defined function can be used to access tables that could have
509  * unpleasant results because a historic snapshot is used. That's why only
510  * immutable built-in functions are allowed in row filter expressions.
511  *
512  * We don't allow system columns because currently, we don't have that
513  * information in the tuple passed to downstream. Also, as we don't replicate
514  * those to subscribers, there doesn't seem to be a need for a filter on those
515  * columns.
516  *
517  * We can allow other node types after more analysis and testing.
518  */
519 static bool
521 {
522  char *errdetail_msg = NULL;
523 
524  if (node == NULL)
525  return false;
526 
527  switch (nodeTag(node))
528  {
529  case T_Var:
530  /* System columns are not allowed. */
531  if (((Var *) node)->varattno < InvalidAttrNumber)
532  errdetail_msg = _("System columns are not allowed.");
533  break;
534  case T_OpExpr:
535  case T_DistinctExpr:
536  case T_NullIfExpr:
537  /* OK, except user-defined operators are not allowed. */
538  if (((OpExpr *) node)->opno >= FirstNormalObjectId)
539  errdetail_msg = _("User-defined operators are not allowed.");
540  break;
541  case T_ScalarArrayOpExpr:
542  /* OK, except user-defined operators are not allowed. */
543  if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
544  errdetail_msg = _("User-defined operators are not allowed.");
545 
546  /*
547  * We don't need to check the hashfuncid and negfuncid of
548  * ScalarArrayOpExpr as those functions are only built for a
549  * subquery.
550  */
551  break;
552  case T_RowCompareExpr:
553  {
554  ListCell *opid;
555 
556  /* OK, except user-defined operators are not allowed. */
557  foreach(opid, ((RowCompareExpr *) node)->opnos)
558  {
559  if (lfirst_oid(opid) >= FirstNormalObjectId)
560  {
561  errdetail_msg = _("User-defined operators are not allowed.");
562  break;
563  }
564  }
565  }
566  break;
567  case T_Const:
568  case T_FuncExpr:
569  case T_BoolExpr:
570  case T_RelabelType:
571  case T_CollateExpr:
572  case T_CaseExpr:
573  case T_CaseTestExpr:
574  case T_ArrayExpr:
575  case T_RowExpr:
576  case T_CoalesceExpr:
577  case T_MinMaxExpr:
578  case T_XmlExpr:
579  case T_NullTest:
580  case T_BooleanTest:
581  case T_List:
582  /* OK, supported */
583  break;
584  default:
585  errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
586  break;
587  }
588 
589  /*
590  * For all the supported nodes, if we haven't already found a problem,
591  * check the types, functions, and collations used in it. We check List
592  * by walking through each element.
593  */
594  if (!errdetail_msg && !IsA(node, List))
595  {
596  if (exprType(node) >= FirstNormalObjectId)
597  errdetail_msg = _("User-defined types are not allowed.");
599  pstate))
600  errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
601  else if (exprCollation(node) >= FirstNormalObjectId ||
603  errdetail_msg = _("User-defined collations are not allowed.");
604  }
605 
606  /*
607  * If we found a problem in this node, throw error now. Otherwise keep
608  * going.
609  */
610  if (errdetail_msg)
611  ereport(ERROR,
612  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
613  errmsg("invalid publication WHERE expression"),
614  errdetail_internal("%s", errdetail_msg),
615  parser_errposition(pstate, exprLocation(node))));
616 
618  pstate);
619 }
620 
621 /*
622  * Check if the row filter expression is a "simple expression".
623  *
624  * See check_simple_rowfilter_expr_walker for details.
625  */
626 static bool
628 {
629  return check_simple_rowfilter_expr_walker(node, pstate);
630 }
631 
632 /*
633  * Transform the publication WHERE expression for all the relations in the list,
634  * ensuring it is coerced to boolean and necessary collation information is
635  * added if required, and add a new nsitem/RTE for the associated relation to
636  * the ParseState's namespace list.
637  *
638  * Also check the publication row filter expression and throw an error if
639  * anything not permitted or unexpected is encountered.
640  */
641 static void
642 TransformPubWhereClauses(List *tables, const char *queryString,
643  bool pubviaroot)
644 {
645  ListCell *lc;
646 
647  foreach(lc, tables)
648  {
649  ParseNamespaceItem *nsitem;
650  Node *whereclause = NULL;
651  ParseState *pstate;
653 
654  if (pri->whereClause == NULL)
655  continue;
656 
657  /*
658  * If the publication doesn't publish changes via the root partitioned
659  * table, the partition's row filter will be used. So disallow using
660  * WHERE clause on partitioned table in this case.
661  */
662  if (!pubviaroot &&
663  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
664  ereport(ERROR,
665  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
666  errmsg("cannot use publication WHERE clause for relation \"%s\"",
668  errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
669  "publish_via_partition_root")));
670 
671  /*
672  * A fresh pstate is required so that we only have "this" table in its
673  * rangetable
674  */
675  pstate = make_parsestate(NULL);
676  pstate->p_sourcetext = queryString;
677  nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
678  AccessShareLock, NULL,
679  false, false);
680  addNSItemToQuery(pstate, nsitem, false, true, true);
681 
682  whereclause = transformWhereClause(pstate,
683  copyObject(pri->whereClause),
685  "PUBLICATION WHERE");
686 
687  /* Fix up collation information */
688  assign_expr_collations(pstate, whereclause);
689 
690  /*
691  * We allow only simple expressions in row filters. See
692  * check_simple_rowfilter_expr_walker.
693  */
694  check_simple_rowfilter_expr(whereclause, pstate);
695 
696  free_parsestate(pstate);
697 
698  pri->whereClause = whereclause;
699  }
700 }
701 
702 
703 /*
704  * Given a list of tables that are going to be added to a publication,
705  * verify that they fulfill the necessary preconditions, namely: no tables
706  * have a column list if any schema is published; and partitioned tables do
707  * not have column lists if publish_via_partition_root is not set.
708  *
709  * 'publish_schema' indicates that the publication contains any TABLES IN
710  * SCHEMA elements (newly added in this command, or preexisting).
711  * 'pubviaroot' is the value of publish_via_partition_root.
712  */
713 static void
714 CheckPubRelationColumnList(char *pubname, List *tables,
715  bool publish_schema, bool pubviaroot)
716 {
717  ListCell *lc;
718 
719  foreach(lc, tables)
720  {
722 
723  if (pri->columns == NIL)
724  continue;
725 
726  /*
727  * Disallow specifying column list if any schema is in the
728  * publication.
729  *
730  * XXX We could instead just forbid the case when the publication
731  * tries to publish the table with a column list and a schema for that
732  * table. However, if we do that then we need a restriction during
733  * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
734  * seem to be a good idea.
735  */
736  if (publish_schema)
737  ereport(ERROR,
738  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
739  errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
741  RelationGetRelationName(pri->relation), pubname),
742  errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
743 
744  /*
745  * If the publication doesn't publish changes via the root partitioned
746  * table, the partition's column list will be used. So disallow using
747  * a column list on the partitioned table in this case.
748  */
749  if (!pubviaroot &&
750  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
751  ereport(ERROR,
752  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
753  errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
755  RelationGetRelationName(pri->relation), pubname),
756  errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
757  "publish_via_partition_root")));
758  }
759 }
760 
761 /*
762  * Create new publication.
763  */
766 {
767  Relation rel;
768  ObjectAddress myself;
769  Oid puboid;
770  bool nulls[Natts_pg_publication];
771  Datum values[Natts_pg_publication];
772  HeapTuple tup;
773  bool publish_given;
774  PublicationActions pubactions;
775  bool publish_via_partition_root_given;
776  bool publish_via_partition_root;
777  bool publish_generated_columns_given;
778  bool publish_generated_columns;
779  AclResult aclresult;
780  List *relations = NIL;
781  List *schemaidlist = NIL;
782 
783  /* must have CREATE privilege on database */
784  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
785  if (aclresult != ACLCHECK_OK)
786  aclcheck_error(aclresult, OBJECT_DATABASE,
788 
789  /* FOR ALL TABLES requires superuser */
790  if (stmt->for_all_tables && !superuser())
791  ereport(ERROR,
792  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
793  errmsg("must be superuser to create FOR ALL TABLES publication")));
794 
795  rel = table_open(PublicationRelationId, RowExclusiveLock);
796 
797  /* Check if name is used */
798  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
799  CStringGetDatum(stmt->pubname));
800  if (OidIsValid(puboid))
801  ereport(ERROR,
803  errmsg("publication \"%s\" already exists",
804  stmt->pubname)));
805 
806  /* Form a tuple. */
807  memset(values, 0, sizeof(values));
808  memset(nulls, false, sizeof(nulls));
809 
810  values[Anum_pg_publication_pubname - 1] =
812  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
813 
815  stmt->options,
816  &publish_given, &pubactions,
817  &publish_via_partition_root_given,
818  &publish_via_partition_root,
819  &publish_generated_columns_given,
820  &publish_generated_columns);
821 
822  puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
823  Anum_pg_publication_oid);
824  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
825  values[Anum_pg_publication_puballtables - 1] =
826  BoolGetDatum(stmt->for_all_tables);
827  values[Anum_pg_publication_pubinsert - 1] =
828  BoolGetDatum(pubactions.pubinsert);
829  values[Anum_pg_publication_pubupdate - 1] =
830  BoolGetDatum(pubactions.pubupdate);
831  values[Anum_pg_publication_pubdelete - 1] =
832  BoolGetDatum(pubactions.pubdelete);
833  values[Anum_pg_publication_pubtruncate - 1] =
834  BoolGetDatum(pubactions.pubtruncate);
835  values[Anum_pg_publication_pubviaroot - 1] =
836  BoolGetDatum(publish_via_partition_root);
837  values[Anum_pg_publication_pubgencols - 1] =
838  BoolGetDatum(publish_generated_columns);
839 
840  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
841 
842  /* Insert tuple into catalog. */
843  CatalogTupleInsert(rel, tup);
844  heap_freetuple(tup);
845 
846  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
847 
848  ObjectAddressSet(myself, PublicationRelationId, puboid);
849 
850  /* Make the changes visible. */
852 
853  /* Associate objects with the publication. */
854  if (stmt->for_all_tables)
855  {
856  /* Invalidate relcache so that publication info is rebuilt. */
858  }
859  else
860  {
861  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
862  &schemaidlist);
863 
864  /* FOR TABLES IN SCHEMA requires superuser */
865  if (schemaidlist != NIL && !superuser())
866  ereport(ERROR,
867  errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
868  errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
869 
870  if (relations != NIL)
871  {
872  List *rels;
873 
874  rels = OpenTableList(relations);
876  publish_via_partition_root);
877 
878  CheckPubRelationColumnList(stmt->pubname, rels,
879  schemaidlist != NIL,
880  publish_via_partition_root);
881 
882  PublicationAddTables(puboid, rels, true, NULL);
883  CloseTableList(rels);
884  }
885 
886  if (schemaidlist != NIL)
887  {
888  /*
889  * Schema lock is held until the publication is created to prevent
890  * concurrent schema deletion.
891  */
892  LockSchemaList(schemaidlist);
893  PublicationAddSchemas(puboid, schemaidlist, true, NULL);
894  }
895  }
896 
898 
899  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
900 
903  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904  errmsg("\"wal_level\" is insufficient to publish logical changes"),
905  errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
906 
907  return myself;
908 }
909 
910 /*
911  * Change options of a publication.
912  */
913 static void
915  Relation rel, HeapTuple tup)
916 {
917  bool nulls[Natts_pg_publication];
918  bool replaces[Natts_pg_publication];
919  Datum values[Natts_pg_publication];
920  bool publish_given;
921  PublicationActions pubactions;
922  bool publish_via_partition_root_given;
923  bool publish_via_partition_root;
924  bool publish_generated_columns_given;
925  bool publish_generated_columns;
926  ObjectAddress obj;
927  Form_pg_publication pubform;
928  List *root_relids = NIL;
929  ListCell *lc;
930 
932  stmt->options,
933  &publish_given, &pubactions,
934  &publish_via_partition_root_given,
935  &publish_via_partition_root,
936  &publish_generated_columns_given,
937  &publish_generated_columns);
938 
939  pubform = (Form_pg_publication) GETSTRUCT(tup);
940 
941  /*
942  * If the publication doesn't publish changes via the root partitioned
943  * table, the partition's row filter and column list will be used. So
944  * disallow using WHERE clause and column lists on partitioned table in
945  * this case.
946  */
947  if (!pubform->puballtables && publish_via_partition_root_given &&
948  !publish_via_partition_root)
949  {
950  /*
951  * Lock the publication so nobody else can do anything with it. This
952  * prevents concurrent alter to add partitioned table(s) with WHERE
953  * clause(s) and/or column lists which we don't allow when not
954  * publishing via root.
955  */
956  LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
958 
959  root_relids = GetPublicationRelations(pubform->oid,
961 
962  foreach(lc, root_relids)
963  {
964  Oid relid = lfirst_oid(lc);
965  HeapTuple rftuple;
966  char relkind;
967  char *relname;
968  bool has_rowfilter;
969  bool has_collist;
970 
971  /*
972  * Beware: we don't have lock on the relations, so cope silently
973  * with the cache lookups returning NULL.
974  */
975 
976  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
977  ObjectIdGetDatum(relid),
978  ObjectIdGetDatum(pubform->oid));
979  if (!HeapTupleIsValid(rftuple))
980  continue;
981  has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
982  has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
983  if (!has_rowfilter && !has_collist)
984  {
985  ReleaseSysCache(rftuple);
986  continue;
987  }
988 
989  relkind = get_rel_relkind(relid);
990  if (relkind != RELKIND_PARTITIONED_TABLE)
991  {
992  ReleaseSysCache(rftuple);
993  continue;
994  }
995  relname = get_rel_name(relid);
996  if (relname == NULL) /* table concurrently dropped */
997  {
998  ReleaseSysCache(rftuple);
999  continue;
1000  }
1001 
1002  if (has_rowfilter)
1003  ereport(ERROR,
1004  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1005  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1006  "publish_via_partition_root",
1007  stmt->pubname),
1008  errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1009  relname, "publish_via_partition_root")));
1010  Assert(has_collist);
1011  ereport(ERROR,
1012  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1013  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1014  "publish_via_partition_root",
1015  stmt->pubname),
1016  errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1017  relname, "publish_via_partition_root")));
1018  }
1019  }
1020 
1021  /* Everything ok, form a new tuple. */
1022  memset(values, 0, sizeof(values));
1023  memset(nulls, false, sizeof(nulls));
1024  memset(replaces, false, sizeof(replaces));
1025 
1026  if (publish_given)
1027  {
1028  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
1029  replaces[Anum_pg_publication_pubinsert - 1] = true;
1030 
1031  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
1032  replaces[Anum_pg_publication_pubupdate - 1] = true;
1033 
1034  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
1035  replaces[Anum_pg_publication_pubdelete - 1] = true;
1036 
1037  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
1038  replaces[Anum_pg_publication_pubtruncate - 1] = true;
1039  }
1040 
1041  if (publish_via_partition_root_given)
1042  {
1043  values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1044  replaces[Anum_pg_publication_pubviaroot - 1] = true;
1045  }
1046 
1047  if (publish_generated_columns_given)
1048  {
1049  values[Anum_pg_publication_pubgencols - 1] = BoolGetDatum(publish_generated_columns);
1050  replaces[Anum_pg_publication_pubgencols - 1] = true;
1051  }
1052 
1053  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1054  replaces);
1055 
1056  /* Update the catalog. */
1057  CatalogTupleUpdate(rel, &tup->t_self, tup);
1058 
1060 
1061  pubform = (Form_pg_publication) GETSTRUCT(tup);
1062 
1063  /* Invalidate the relcache. */
1064  if (pubform->puballtables)
1065  {
1067  }
1068  else
1069  {
1070  List *relids = NIL;
1071  List *schemarelids = NIL;
1072 
1073  /*
1074  * For any partitioned tables contained in the publication, we must
1075  * invalidate all partitions contained in the respective partition
1076  * trees, not just those explicitly mentioned in the publication.
1077  */
1078  if (root_relids == NIL)
1079  relids = GetPublicationRelations(pubform->oid,
1081  else
1082  {
1083  /*
1084  * We already got tables explicitly mentioned in the publication.
1085  * Now get all partitions for the partitioned table in the list.
1086  */
1087  foreach(lc, root_relids)
1088  relids = GetPubPartitionOptionRelations(relids,
1090  lfirst_oid(lc));
1091  }
1092 
1093  schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1095  relids = list_concat_unique_oid(relids, schemarelids);
1096 
1097  InvalidatePublicationRels(relids);
1098  }
1099 
1100  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1102  (Node *) stmt);
1103 
1104  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1105 }
1106 
1107 /*
1108  * Invalidate the relations.
1109  */
1110 void
1112 {
1113  /*
1114  * We don't want to send too many individual messages, at some point it's
1115  * cheaper to just reset whole relcache.
1116  */
1117  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1118  {
1119  ListCell *lc;
1120 
1121  foreach(lc, relids)
1123  }
1124  else
1126 }
1127 
1128 /*
1129  * Add or remove table to/from publication.
1130  */
1131 static void
1133  List *tables, const char *queryString,
1134  bool publish_schema)
1135 {
1136  List *rels = NIL;
1138  Oid pubid = pubform->oid;
1139 
1140  /*
1141  * Nothing to do if no objects, except in SET: for that it is quite
1142  * possible that user has not specified any tables in which case we need
1143  * to remove all the existing tables.
1144  */
1145  if (!tables && stmt->action != AP_SetObjects)
1146  return;
1147 
1148  rels = OpenTableList(tables);
1149 
1150  if (stmt->action == AP_AddObjects)
1151  {
1152  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1153 
1154  publish_schema |= is_schema_publication(pubid);
1155 
1156  CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1157  pubform->pubviaroot);
1158 
1159  PublicationAddTables(pubid, rels, false, stmt);
1160  }
1161  else if (stmt->action == AP_DropObjects)
1162  PublicationDropTables(pubid, rels, false);
1163  else /* AP_SetObjects */
1164  {
1165  List *oldrelids = GetPublicationRelations(pubid,
1167  List *delrels = NIL;
1168  ListCell *oldlc;
1169 
1170  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1171 
1172  CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1173  pubform->pubviaroot);
1174 
1175  /*
1176  * To recreate the relation list for the publication, look for
1177  * existing relations that do not need to be dropped.
1178  */
1179  foreach(oldlc, oldrelids)
1180  {
1181  Oid oldrelid = lfirst_oid(oldlc);
1182  ListCell *newlc;
1183  PublicationRelInfo *oldrel;
1184  bool found = false;
1185  HeapTuple rftuple;
1186  Node *oldrelwhereclause = NULL;
1187  Bitmapset *oldcolumns = NULL;
1188 
1189  /* look up the cache for the old relmap */
1190  rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1191  ObjectIdGetDatum(oldrelid),
1192  ObjectIdGetDatum(pubid));
1193 
1194  /*
1195  * See if the existing relation currently has a WHERE clause or a
1196  * column list. We need to compare those too.
1197  */
1198  if (HeapTupleIsValid(rftuple))
1199  {
1200  bool isnull = true;
1201  Datum whereClauseDatum;
1202  Datum columnListDatum;
1203 
1204  /* Load the WHERE clause for this table. */
1205  whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1206  Anum_pg_publication_rel_prqual,
1207  &isnull);
1208  if (!isnull)
1209  oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1210 
1211  /* Transform the int2vector column list to a bitmap. */
1212  columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1213  Anum_pg_publication_rel_prattrs,
1214  &isnull);
1215 
1216  if (!isnull)
1217  oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1218 
1219  ReleaseSysCache(rftuple);
1220  }
1221 
1222  foreach(newlc, rels)
1223  {
1224  PublicationRelInfo *newpubrel;
1225  Oid newrelid;
1226  Bitmapset *newcolumns = NULL;
1227 
1228  newpubrel = (PublicationRelInfo *) lfirst(newlc);
1229  newrelid = RelationGetRelid(newpubrel->relation);
1230 
1231  /*
1232  * Validate the column list. If the column list or WHERE
1233  * clause changes, then the validation done here will be
1234  * duplicated inside PublicationAddTables(). The validation
1235  * is cheap enough that that seems harmless.
1236  */
1237  newcolumns = pub_collist_validate(newpubrel->relation,
1238  newpubrel->columns);
1239 
1240  /*
1241  * Check if any of the new set of relations matches with the
1242  * existing relations in the publication. Additionally, if the
1243  * relation has an associated WHERE clause, check the WHERE
1244  * expressions also match. Same for the column list. Drop the
1245  * rest.
1246  */
1247  if (newrelid == oldrelid)
1248  {
1249  if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1250  bms_equal(oldcolumns, newcolumns))
1251  {
1252  found = true;
1253  break;
1254  }
1255  }
1256  }
1257 
1258  /*
1259  * Add the non-matched relations to a list so that they can be
1260  * dropped.
1261  */
1262  if (!found)
1263  {
1264  oldrel = palloc(sizeof(PublicationRelInfo));
1265  oldrel->whereClause = NULL;
1266  oldrel->columns = NIL;
1267  oldrel->relation = table_open(oldrelid,
1269  delrels = lappend(delrels, oldrel);
1270  }
1271  }
1272 
1273  /* And drop them. */
1274  PublicationDropTables(pubid, delrels, true);
1275 
1276  /*
1277  * Don't bother calculating the difference for adding, we'll catch and
1278  * skip existing ones when doing catalog update.
1279  */
1280  PublicationAddTables(pubid, rels, true, stmt);
1281 
1282  CloseTableList(delrels);
1283  }
1284 
1285  CloseTableList(rels);
1286 }
1287 
1288 /*
1289  * Alter the publication schemas.
1290  *
1291  * Add or remove schemas to/from publication.
1292  */
1293 static void
1295  HeapTuple tup, List *schemaidlist)
1296 {
1298 
1299  /*
1300  * Nothing to do if no objects, except in SET: for that it is quite
1301  * possible that user has not specified any schemas in which case we need
1302  * to remove all the existing schemas.
1303  */
1304  if (!schemaidlist && stmt->action != AP_SetObjects)
1305  return;
1306 
1307  /*
1308  * Schema lock is held until the publication is altered to prevent
1309  * concurrent schema deletion.
1310  */
1311  LockSchemaList(schemaidlist);
1312  if (stmt->action == AP_AddObjects)
1313  {
1314  ListCell *lc;
1315  List *reloids;
1316 
1317  reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1318 
1319  foreach(lc, reloids)
1320  {
1321  HeapTuple coltuple;
1322 
1323  coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1325  ObjectIdGetDatum(pubform->oid));
1326 
1327  if (!HeapTupleIsValid(coltuple))
1328  continue;
1329 
1330  /*
1331  * Disallow adding schema if column list is already part of the
1332  * publication. See CheckPubRelationColumnList.
1333  */
1334  if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1335  ereport(ERROR,
1336  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1337  errmsg("cannot add schema to publication \"%s\"",
1338  stmt->pubname),
1339  errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1340 
1341  ReleaseSysCache(coltuple);
1342  }
1343 
1344  PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1345  }
1346  else if (stmt->action == AP_DropObjects)
1347  PublicationDropSchemas(pubform->oid, schemaidlist, false);
1348  else /* AP_SetObjects */
1349  {
1350  List *oldschemaids = GetPublicationSchemas(pubform->oid);
1351  List *delschemas = NIL;
1352 
1353  /* Identify which schemas should be dropped */
1354  delschemas = list_difference_oid(oldschemaids, schemaidlist);
1355 
1356  /*
1357  * Schema lock is held until the publication is altered to prevent
1358  * concurrent schema deletion.
1359  */
1360  LockSchemaList(delschemas);
1361 
1362  /* And drop them */
1363  PublicationDropSchemas(pubform->oid, delschemas, true);
1364 
1365  /*
1366  * Don't bother calculating the difference for adding, we'll catch and
1367  * skip existing ones when doing catalog update.
1368  */
1369  PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1370  }
1371 }
1372 
1373 /*
1374  * Check if relations and schemas can be in a given publication and throw
1375  * appropriate error if not.
1376  */
1377 static void
1379  List *tables, List *schemaidlist)
1380 {
1382 
1383  if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1384  schemaidlist && !superuser())
1385  ereport(ERROR,
1386  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1387  errmsg("must be superuser to add or set schemas")));
1388 
1389  /*
1390  * Check that user is allowed to manipulate the publication tables in
1391  * schema
1392  */
1393  if (schemaidlist && pubform->puballtables)
1394  ereport(ERROR,
1395  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1396  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1397  NameStr(pubform->pubname)),
1398  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1399 
1400  /* Check that user is allowed to manipulate the publication tables. */
1401  if (tables && pubform->puballtables)
1402  ereport(ERROR,
1403  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1404  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1405  NameStr(pubform->pubname)),
1406  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1407 }
1408 
1409 /*
1410  * Alter the existing publication.
1411  *
1412  * This is dispatcher function for AlterPublicationOptions,
1413  * AlterPublicationSchemas and AlterPublicationTables.
1414  */
1415 void
1417 {
1418  Relation rel;
1419  HeapTuple tup;
1420  Form_pg_publication pubform;
1421 
1422  rel = table_open(PublicationRelationId, RowExclusiveLock);
1423 
1424  tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1425  CStringGetDatum(stmt->pubname));
1426 
1427  if (!HeapTupleIsValid(tup))
1428  ereport(ERROR,
1429  (errcode(ERRCODE_UNDEFINED_OBJECT),
1430  errmsg("publication \"%s\" does not exist",
1431  stmt->pubname)));
1432 
1433  pubform = (Form_pg_publication) GETSTRUCT(tup);
1434 
1435  /* must be owner */
1436  if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1438  stmt->pubname);
1439 
1440  if (stmt->options)
1441  AlterPublicationOptions(pstate, stmt, rel, tup);
1442  else
1443  {
1444  List *relations = NIL;
1445  List *schemaidlist = NIL;
1446  Oid pubid = pubform->oid;
1447 
1448  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1449  &schemaidlist);
1450 
1451  CheckAlterPublication(stmt, tup, relations, schemaidlist);
1452 
1453  heap_freetuple(tup);
1454 
1455  /* Lock the publication so nobody else can do anything with it. */
1456  LockDatabaseObject(PublicationRelationId, pubid, 0,
1458 
1459  /*
1460  * It is possible that by the time we acquire the lock on publication,
1461  * concurrent DDL has removed it. We can test this by checking the
1462  * existence of publication. We get the tuple again to avoid the risk
1463  * of any publication option getting changed.
1464  */
1465  tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1466  if (!HeapTupleIsValid(tup))
1467  ereport(ERROR,
1468  errcode(ERRCODE_UNDEFINED_OBJECT),
1469  errmsg("publication \"%s\" does not exist",
1470  stmt->pubname));
1471 
1472  AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1473  schemaidlist != NIL);
1474  AlterPublicationSchemas(stmt, tup, schemaidlist);
1475  }
1476 
1477  /* Cleanup. */
1478  heap_freetuple(tup);
1480 }
1481 
1482 /*
1483  * Remove relation from publication by mapping OID.
1484  */
1485 void
1487 {
1488  Relation rel;
1489  HeapTuple tup;
1490  Form_pg_publication_rel pubrel;
1491  List *relids = NIL;
1492 
1493  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1494 
1495  tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1496 
1497  if (!HeapTupleIsValid(tup))
1498  elog(ERROR, "cache lookup failed for publication table %u",
1499  proid);
1500 
1501  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1502 
1503  /*
1504  * Invalidate relcache so that publication info is rebuilt.
1505  *
1506  * For the partitioned tables, we must invalidate all partitions contained
1507  * in the respective partition hierarchies, not just the one explicitly
1508  * mentioned in the publication. This is required because we implicitly
1509  * publish the child tables when the parent table is published.
1510  */
1512  pubrel->prrelid);
1513 
1514  InvalidatePublicationRels(relids);
1515 
1516  CatalogTupleDelete(rel, &tup->t_self);
1517 
1518  ReleaseSysCache(tup);
1519 
1521 }
1522 
1523 /*
1524  * Remove the publication by mapping OID.
1525  */
1526 void
1528 {
1529  Relation rel;
1530  HeapTuple tup;
1531  Form_pg_publication pubform;
1532 
1533  rel = table_open(PublicationRelationId, RowExclusiveLock);
1534 
1535  tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1536  if (!HeapTupleIsValid(tup))
1537  elog(ERROR, "cache lookup failed for publication %u", pubid);
1538 
1539  pubform = (Form_pg_publication) GETSTRUCT(tup);
1540 
1541  /* Invalidate relcache so that publication info is rebuilt. */
1542  if (pubform->puballtables)
1544 
1545  CatalogTupleDelete(rel, &tup->t_self);
1546 
1547  ReleaseSysCache(tup);
1548 
1550 }
1551 
1552 /*
1553  * Remove schema from publication by mapping OID.
1554  */
1555 void
1557 {
1558  Relation rel;
1559  HeapTuple tup;
1560  List *schemaRels = NIL;
1562 
1563  rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1564 
1565  tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1566 
1567  if (!HeapTupleIsValid(tup))
1568  elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1569 
1570  pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1571 
1572  /*
1573  * Invalidate relcache so that publication info is rebuilt. See
1574  * RemovePublicationRelById for why we need to consider all the
1575  * partitions.
1576  */
1577  schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1579  InvalidatePublicationRels(schemaRels);
1580 
1581  CatalogTupleDelete(rel, &tup->t_self);
1582 
1583  ReleaseSysCache(tup);
1584 
1586 }
1587 
1588 /*
1589  * Open relations specified by a PublicationTable list.
1590  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1591  * add them to a publication.
1592  */
1593 static List *
1595 {
1596  List *relids = NIL;
1597  List *rels = NIL;
1598  ListCell *lc;
1599  List *relids_with_rf = NIL;
1600  List *relids_with_collist = NIL;
1601 
1602  /*
1603  * Open, share-lock, and check all the explicitly-specified relations
1604  */
1605  foreach(lc, tables)
1606  {
1608  bool recurse = t->relation->inh;
1609  Relation rel;
1610  Oid myrelid;
1611  PublicationRelInfo *pub_rel;
1612 
1613  /* Allow query cancel in case this takes a long time */
1615 
1617  myrelid = RelationGetRelid(rel);
1618 
1619  /*
1620  * Filter out duplicates if user specifies "foo, foo".
1621  *
1622  * Note that this algorithm is known to not be very efficient (O(N^2))
1623  * but given that it only works on list of tables given to us by user
1624  * it's deemed acceptable.
1625  */
1626  if (list_member_oid(relids, myrelid))
1627  {
1628  /* Disallow duplicate tables if there are any with row filters. */
1629  if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1630  ereport(ERROR,
1632  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1633  RelationGetRelationName(rel))));
1634 
1635  /* Disallow duplicate tables if there are any with column lists. */
1636  if (t->columns || list_member_oid(relids_with_collist, myrelid))
1637  ereport(ERROR,
1639  errmsg("conflicting or redundant column lists for table \"%s\"",
1640  RelationGetRelationName(rel))));
1641 
1643  continue;
1644  }
1645 
1646  pub_rel = palloc(sizeof(PublicationRelInfo));
1647  pub_rel->relation = rel;
1648  pub_rel->whereClause = t->whereClause;
1649  pub_rel->columns = t->columns;
1650  rels = lappend(rels, pub_rel);
1651  relids = lappend_oid(relids, myrelid);
1652 
1653  if (t->whereClause)
1654  relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1655 
1656  if (t->columns)
1657  relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1658 
1659  /*
1660  * Add children of this rel, if requested, so that they too are added
1661  * to the publication. A partitioned table can't have any inheritance
1662  * children other than its partitions, which need not be explicitly
1663  * added to the publication.
1664  */
1665  if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1666  {
1667  List *children;
1668  ListCell *child;
1669 
1670  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1671  NULL);
1672 
1673  foreach(child, children)
1674  {
1675  Oid childrelid = lfirst_oid(child);
1676 
1677  /* Allow query cancel in case this takes a long time */
1679 
1680  /*
1681  * Skip duplicates if user specified both parent and child
1682  * tables.
1683  */
1684  if (list_member_oid(relids, childrelid))
1685  {
1686  /*
1687  * We don't allow to specify row filter for both parent
1688  * and child table at the same time as it is not very
1689  * clear which one should be given preference.
1690  */
1691  if (childrelid != myrelid &&
1692  (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1693  ereport(ERROR,
1695  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1696  RelationGetRelationName(rel))));
1697 
1698  /*
1699  * We don't allow to specify column list for both parent
1700  * and child table at the same time as it is not very
1701  * clear which one should be given preference.
1702  */
1703  if (childrelid != myrelid &&
1704  (t->columns || list_member_oid(relids_with_collist, childrelid)))
1705  ereport(ERROR,
1707  errmsg("conflicting or redundant column lists for table \"%s\"",
1708  RelationGetRelationName(rel))));
1709 
1710  continue;
1711  }
1712 
1713  /* find_all_inheritors already got lock */
1714  rel = table_open(childrelid, NoLock);
1715  pub_rel = palloc(sizeof(PublicationRelInfo));
1716  pub_rel->relation = rel;
1717  /* child inherits WHERE clause from parent */
1718  pub_rel->whereClause = t->whereClause;
1719 
1720  /* child inherits column list from parent */
1721  pub_rel->columns = t->columns;
1722  rels = lappend(rels, pub_rel);
1723  relids = lappend_oid(relids, childrelid);
1724 
1725  if (t->whereClause)
1726  relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1727 
1728  if (t->columns)
1729  relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1730  }
1731  }
1732  }
1733 
1734  list_free(relids);
1735  list_free(relids_with_rf);
1736 
1737  return rels;
1738 }
1739 
1740 /*
1741  * Close all relations in the list.
1742  */
1743 static void
1745 {
1746  ListCell *lc;
1747 
1748  foreach(lc, rels)
1749  {
1750  PublicationRelInfo *pub_rel;
1751 
1752  pub_rel = (PublicationRelInfo *) lfirst(lc);
1753  table_close(pub_rel->relation, NoLock);
1754  }
1755 
1756  list_free_deep(rels);
1757 }
1758 
1759 /*
1760  * Lock the schemas specified in the schema list in AccessShareLock mode in
1761  * order to prevent concurrent schema deletion.
1762  */
1763 static void
1764 LockSchemaList(List *schemalist)
1765 {
1766  ListCell *lc;
1767 
1768  foreach(lc, schemalist)
1769  {
1770  Oid schemaid = lfirst_oid(lc);
1771 
1772  /* Allow query cancel in case this takes a long time */
1774  LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1775 
1776  /*
1777  * It is possible that by the time we acquire the lock on schema,
1778  * concurrent DDL has removed it. We can test this by checking the
1779  * existence of schema.
1780  */
1781  if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1782  ereport(ERROR,
1783  errcode(ERRCODE_UNDEFINED_SCHEMA),
1784  errmsg("schema with OID %u does not exist", schemaid));
1785  }
1786 }
1787 
1788 /*
1789  * Add listed tables to the publication.
1790  */
1791 static void
1792 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1794 {
1795  ListCell *lc;
1796 
1797  Assert(!stmt || !stmt->for_all_tables);
1798 
1799  foreach(lc, rels)
1800  {
1801  PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1802  Relation rel = pub_rel->relation;
1803  ObjectAddress obj;
1804 
1805  /* Must be owner of the table or superuser. */
1806  if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1809 
1810  obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1811  if (stmt)
1812  {
1814  (Node *) stmt);
1815 
1816  InvokeObjectPostCreateHook(PublicationRelRelationId,
1817  obj.objectId, 0);
1818  }
1819  }
1820 }
1821 
1822 /*
1823  * Remove listed tables from the publication.
1824  */
1825 static void
1826 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1827 {
1828  ObjectAddress obj;
1829  ListCell *lc;
1830  Oid prid;
1831 
1832  foreach(lc, rels)
1833  {
1834  PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1835  Relation rel = pubrel->relation;
1836  Oid relid = RelationGetRelid(rel);
1837 
1838  if (pubrel->columns)
1839  ereport(ERROR,
1840  errcode(ERRCODE_SYNTAX_ERROR),
1841  errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1842 
1843  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1844  ObjectIdGetDatum(relid),
1845  ObjectIdGetDatum(pubid));
1846  if (!OidIsValid(prid))
1847  {
1848  if (missing_ok)
1849  continue;
1850 
1851  ereport(ERROR,
1852  (errcode(ERRCODE_UNDEFINED_OBJECT),
1853  errmsg("relation \"%s\" is not part of the publication",
1854  RelationGetRelationName(rel))));
1855  }
1856 
1857  if (pubrel->whereClause)
1858  ereport(ERROR,
1859  (errcode(ERRCODE_SYNTAX_ERROR),
1860  errmsg("cannot use a WHERE clause when removing a table from a publication")));
1861 
1862  ObjectAddressSet(obj, PublicationRelRelationId, prid);
1863  performDeletion(&obj, DROP_CASCADE, 0);
1864  }
1865 }
1866 
1867 /*
1868  * Add listed schemas to the publication.
1869  */
1870 static void
1871 PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1873 {
1874  ListCell *lc;
1875 
1876  Assert(!stmt || !stmt->for_all_tables);
1877 
1878  foreach(lc, schemas)
1879  {
1880  Oid schemaid = lfirst_oid(lc);
1881  ObjectAddress obj;
1882 
1883  obj = publication_add_schema(pubid, schemaid, if_not_exists);
1884  if (stmt)
1885  {
1887  (Node *) stmt);
1888 
1889  InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1890  obj.objectId, 0);
1891  }
1892  }
1893 }
1894 
1895 /*
1896  * Remove listed schemas from the publication.
1897  */
1898 static void
1899 PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1900 {
1901  ObjectAddress obj;
1902  ListCell *lc;
1903  Oid psid;
1904 
1905  foreach(lc, schemas)
1906  {
1907  Oid schemaid = lfirst_oid(lc);
1908 
1909  psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1910  Anum_pg_publication_namespace_oid,
1911  ObjectIdGetDatum(schemaid),
1912  ObjectIdGetDatum(pubid));
1913  if (!OidIsValid(psid))
1914  {
1915  if (missing_ok)
1916  continue;
1917 
1918  ereport(ERROR,
1919  (errcode(ERRCODE_UNDEFINED_OBJECT),
1920  errmsg("tables from schema \"%s\" are not part of the publication",
1921  get_namespace_name(schemaid))));
1922  }
1923 
1924  ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1925  performDeletion(&obj, DROP_CASCADE, 0);
1926  }
1927 }
1928 
1929 /*
1930  * Internal workhorse for changing a publication owner
1931  */
1932 static void
1934 {
1935  Form_pg_publication form;
1936 
1937  form = (Form_pg_publication) GETSTRUCT(tup);
1938 
1939  if (form->pubowner == newOwnerId)
1940  return;
1941 
1942  if (!superuser())
1943  {
1944  AclResult aclresult;
1945 
1946  /* Must be owner */
1947  if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1949  NameStr(form->pubname));
1950 
1951  /* Must be able to become new owner */
1952  check_can_set_role(GetUserId(), newOwnerId);
1953 
1954  /* New owner must have CREATE privilege on database */
1955  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1956  if (aclresult != ACLCHECK_OK)
1957  aclcheck_error(aclresult, OBJECT_DATABASE,
1959 
1960  if (form->puballtables && !superuser_arg(newOwnerId))
1961  ereport(ERROR,
1962  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1963  errmsg("permission denied to change owner of publication \"%s\"",
1964  NameStr(form->pubname)),
1965  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1966 
1967  if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1968  ereport(ERROR,
1969  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1970  errmsg("permission denied to change owner of publication \"%s\"",
1971  NameStr(form->pubname)),
1972  errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1973  }
1974 
1975  form->pubowner = newOwnerId;
1976  CatalogTupleUpdate(rel, &tup->t_self, tup);
1977 
1978  /* Update owner dependency reference */
1979  changeDependencyOnOwner(PublicationRelationId,
1980  form->oid,
1981  newOwnerId);
1982 
1983  InvokeObjectPostAlterHook(PublicationRelationId,
1984  form->oid, 0);
1985 }
1986 
1987 /*
1988  * Change publication owner -- by name
1989  */
1991 AlterPublicationOwner(const char *name, Oid newOwnerId)
1992 {
1993  Oid subid;
1994  HeapTuple tup;
1995  Relation rel;
1996  ObjectAddress address;
1997  Form_pg_publication pubform;
1998 
1999  rel = table_open(PublicationRelationId, RowExclusiveLock);
2000 
2001  tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2002 
2003  if (!HeapTupleIsValid(tup))
2004  ereport(ERROR,
2005  (errcode(ERRCODE_UNDEFINED_OBJECT),
2006  errmsg("publication \"%s\" does not exist", name)));
2007 
2008  pubform = (Form_pg_publication) GETSTRUCT(tup);
2009  subid = pubform->oid;
2010 
2011  AlterPublicationOwner_internal(rel, tup, newOwnerId);
2012 
2013  ObjectAddressSet(address, PublicationRelationId, subid);
2014 
2015  heap_freetuple(tup);
2016 
2018 
2019  return address;
2020 }
2021 
2022 /*
2023  * Change publication owner -- by OID
2024  */
2025 void
2027 {
2028  HeapTuple tup;
2029  Relation rel;
2030 
2031  rel = table_open(PublicationRelationId, RowExclusiveLock);
2032 
2033  tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
2034 
2035  if (!HeapTupleIsValid(tup))
2036  ereport(ERROR,
2037  (errcode(ERRCODE_UNDEFINED_OBJECT),
2038  errmsg("publication with OID %u does not exist", subid)));
2039 
2040  AlterPublicationOwner_internal(rel, tup, newOwnerId);
2041 
2042  heap_freetuple(tup);
2043 
2045 }
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5325
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2622
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3810
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4064
int16 AttrNumber
Definition: attnum.h:21
#define InvalidAttrNumber
Definition: attnum.h:23
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:142
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1306
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define TextDatumGetCString(d)
Definition: builtins.h:98
#define NameStr(name)
Definition: c.h:700
#define Assert(condition)
Definition: c.h:812
#define OidIsValid(objectId)
Definition: c.h:729
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:419
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3187
bool defGetBoolean(DefElem *def)
Definition: define.c:107
char * defGetString(DefElem *def)
Definition: define.c:48
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:384
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:273
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1230
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define _(x)
Definition: elog.c:90
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:641
Oid MyDatabaseId
Definition: globals.c:93
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition: heaptuple.c:455
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1609
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1576
int x
Definition: isn.c:70
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
List * list_difference_oid(const List *list1, const List *list2)
Definition: list.c:1313
List * list_concat_unique_oid(List *list1, const List *list2)
Definition: list.c:1469
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_oid(List *list, Oid datum)
Definition: list.c:375
void list_free(List *list)
Definition: list.c:1546
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:722
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1380
void list_free_deep(List *list)
Definition: list.c:1560
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:993
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
AttrNumber get_attnum(Oid relid, const char *attname)
Definition: lsyscache.c:858
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3366
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2003
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1928
char func_volatile(Oid funcid)
Definition: lsyscache.c:1780
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition: lsyscache.c:827
void * palloc(Size size)
Definition: mcxt.c:1317
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
Oid GetUserId(void)
Definition: miscinit.c:524
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
List * fetch_search_path(bool includeImplicit)
Definition: namespace.c:4819
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Definition: namespace.c:3535
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:42
Oid exprInputCollation(const Node *expr)
Definition: nodeFuncs.c:1068
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Definition: nodeFuncs.c:1899
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:816
int exprLocation(const Node *expr)
Definition: nodeFuncs.c:1380
#define expression_tree_walker(n, w, c)
Definition: nodeFuncs.h:153
#define IsA(nodeptr, _type_)
Definition: nodes.h:158
#define copyObject(obj)
Definition: nodes.h:224
#define nodeTag(nodeptr)
Definition: nodes.h:133
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
Definition: parse_node.c:72
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:106
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:39
@ EXPR_KIND_WHERE
Definition: parse_node.h:46
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
Definition: parsenodes.h:4174
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
Definition: parsenodes.h:4173
@ PUBLICATIONOBJ_TABLE
Definition: parsenodes.h:4172
@ AP_DropObjects
Definition: parsenodes.h:4200
@ AP_SetObjects
Definition: parsenodes.h:4201
@ AP_AddObjects
Definition: parsenodes.h:4199
@ DROP_CASCADE
Definition: parsenodes.h:2342
@ OBJECT_DATABASE
Definition: parsenodes.h:2277
@ OBJECT_PUBLICATION
Definition: parsenodes.h:2298
#define ACL_CREATE
Definition: parsenodes.h:85
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:209
NameData relname
Definition: pg_class.h:38
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:255
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_node(type, lc)
Definition: pg_list.h:176
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define linitial_oid(l)
Definition: pg_list.h:180
#define lfirst_oid(lc)
Definition: pg_list.h:174
bool is_schema_publication(Oid pubid)
List * GetPublicationSchemas(Oid pubid)
Publication * GetPublication(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Bitmapset * pub_collist_validate(Relation targetrel, List *columns)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:316
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
unsigned int Oid
Definition: postgres_ext.h:31
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot, bool pubgencols, bool *invalid_column_list, bool *invalid_gen_col)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root, bool *publish_generated_columns_given, bool *publish_generated_columns)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
tree context
Definition: radixtree.h:1837
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:505
#define RelationGetDescr(relation)
Definition: rel.h:531
#define RelationGetRelationName(relation)
Definition: rel.h:539
#define RelationGetNamespace(relation)
Definition: rel.h:546
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:5233
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition: relcache.h:63
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
char * defname
Definition: parsenodes.h:817
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
Definition: nodes.h:129
const char * p_sourcetext
Definition: parse_node.h:209
PublicationObjSpecType pubobjtype
Definition: parsenodes.h:4182
PublicationTable * pubtable
Definition: parsenodes.h:4184
RangeVar * relation
Definition: parsenodes.h:4162
bool inh
Definition: primnodes.h:85
TupleDesc rd_att
Definition: rel.h:112
Form_pg_class rd_rel
Definition: rel.h:111
bool has_generated_stored
Definition: tupdesc.h:45
TupleConstr * constr
Definition: tupdesc.h:85
Definition: primnodes.h:248
AttrNumber varattno
Definition: primnodes.h:260
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
bool superuser(void)
Definition: superuser.c:46
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:269
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:221
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:600
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:232
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
#define SearchSysCacheExists1(cacheId, key1)
Definition: syscache.h:100
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:109
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:83
#define FirstNormalObjectId
Definition: transam.h:197
#define TupleDescAttr(tupdesc, i)
Definition: tupdesc.h:92
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3432
const char * name
void CommandCounterIncrement(void)
Definition: xact.c:1099
int wal_level
Definition: xlog.c:131
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76