PostgreSQL Source Code  git master
publicationcmds.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * publicationcmds.c
4  * publication manipulation
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/commands/publicationcmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_database.h"
28 #include "catalog/pg_inherits.h"
29 #include "catalog/pg_namespace.h"
30 #include "catalog/pg_proc.h"
31 #include "catalog/pg_publication.h"
34 #include "catalog/pg_type.h"
35 #include "commands/dbcommands.h"
36 #include "commands/defrem.h"
37 #include "commands/event_trigger.h"
39 #include "funcapi.h"
40 #include "miscadmin.h"
41 #include "nodes/nodeFuncs.h"
42 #include "parser/parse_clause.h"
43 #include "parser/parse_collate.h"
44 #include "parser/parse_relation.h"
45 #include "storage/lmgr.h"
46 #include "utils/acl.h"
47 #include "utils/array.h"
48 #include "utils/builtins.h"
49 #include "utils/catcache.h"
50 #include "utils/fmgroids.h"
51 #include "utils/inval.h"
52 #include "utils/lsyscache.h"
53 #include "utils/rel.h"
54 #include "utils/syscache.h"
55 #include "utils/varlena.h"
56 
57 
58 /*
59  * Information used to validate the columns in the row filter expression. See
60  * contain_invalid_rfcolumn_walker for details.
61  */
62 typedef struct rf_context
63 {
64  Bitmapset *bms_replident; /* bitset of replica identity columns */
65  bool pubviaroot; /* true if we are validating the parent
66  * relation's row filter */
67  Oid relid; /* relid of the relation */
68  Oid parentid; /* relid of the parent relation */
70 
71 static List *OpenTableList(List *tables);
72 static void CloseTableList(List *rels);
73 static void LockSchemaList(List *schemalist);
74 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
76 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
77 static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
79 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
80 
81 
82 static void
84  List *options,
85  bool *publish_given,
86  PublicationActions *pubactions,
87  bool *publish_via_partition_root_given,
88  bool *publish_via_partition_root)
89 {
90  ListCell *lc;
91 
92  *publish_given = false;
93  *publish_via_partition_root_given = false;
94 
95  /* defaults */
96  pubactions->pubinsert = true;
97  pubactions->pubupdate = true;
98  pubactions->pubdelete = true;
99  pubactions->pubtruncate = true;
100  *publish_via_partition_root = false;
101 
102  /* Parse options */
103  foreach(lc, options)
104  {
105  DefElem *defel = (DefElem *) lfirst(lc);
106 
107  if (strcmp(defel->defname, "publish") == 0)
108  {
109  char *publish;
110  List *publish_list;
111  ListCell *lc2;
112 
113  if (*publish_given)
114  errorConflictingDefElem(defel, pstate);
115 
116  /*
117  * If publish option was given only the explicitly listed actions
118  * should be published.
119  */
120  pubactions->pubinsert = false;
121  pubactions->pubupdate = false;
122  pubactions->pubdelete = false;
123  pubactions->pubtruncate = false;
124 
125  *publish_given = true;
126  publish = defGetString(defel);
127 
128  if (!SplitIdentifierString(publish, ',', &publish_list))
129  ereport(ERROR,
130  (errcode(ERRCODE_SYNTAX_ERROR),
131  errmsg("invalid list syntax in parameter \"%s\"",
132  "publish")));
133 
134  /* Process the option list. */
135  foreach(lc2, publish_list)
136  {
137  char *publish_opt = (char *) lfirst(lc2);
138 
139  if (strcmp(publish_opt, "insert") == 0)
140  pubactions->pubinsert = true;
141  else if (strcmp(publish_opt, "update") == 0)
142  pubactions->pubupdate = true;
143  else if (strcmp(publish_opt, "delete") == 0)
144  pubactions->pubdelete = true;
145  else if (strcmp(publish_opt, "truncate") == 0)
146  pubactions->pubtruncate = true;
147  else
148  ereport(ERROR,
149  (errcode(ERRCODE_SYNTAX_ERROR),
150  errmsg("unrecognized value for publication option \"%s\": \"%s\"",
151  "publish", publish_opt)));
152  }
153  }
154  else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
155  {
156  if (*publish_via_partition_root_given)
157  errorConflictingDefElem(defel, pstate);
158  *publish_via_partition_root_given = true;
159  *publish_via_partition_root = defGetBoolean(defel);
160  }
161  else
162  ereport(ERROR,
163  (errcode(ERRCODE_SYNTAX_ERROR),
164  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
165  }
166 }
167 
168 /*
169  * Convert the PublicationObjSpecType list into schema oid list and
170  * PublicationTable list.
171  */
172 static void
173 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
174  List **rels, List **schemas)
175 {
176  ListCell *cell;
177  PublicationObjSpec *pubobj;
178 
179  if (!pubobjspec_list)
180  return;
181 
182  foreach(cell, pubobjspec_list)
183  {
184  Oid schemaid;
185  List *search_path;
186 
187  pubobj = (PublicationObjSpec *) lfirst(cell);
188 
189  switch (pubobj->pubobjtype)
190  {
192  *rels = lappend(*rels, pubobj->pubtable);
193  break;
195  schemaid = get_namespace_oid(pubobj->name, false);
196 
197  /* Filter out duplicates if user specifies "sch1, sch1" */
198  *schemas = list_append_unique_oid(*schemas, schemaid);
199  break;
201  search_path = fetch_search_path(false);
202  if (search_path == NIL) /* nothing valid in search_path? */
203  ereport(ERROR,
204  errcode(ERRCODE_UNDEFINED_SCHEMA),
205  errmsg("no schema has been selected for CURRENT_SCHEMA"));
206 
207  schemaid = linitial_oid(search_path);
208  list_free(search_path);
209 
210  /* Filter out duplicates if user specifies "sch1, sch1" */
211  *schemas = list_append_unique_oid(*schemas, schemaid);
212  break;
213  default:
214  /* shouldn't happen */
215  elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
216  break;
217  }
218  }
219 }
220 
221 /*
222  * Returns true if any of the columns used in the row filter WHERE expression is
223  * not part of REPLICA IDENTITY, false otherwise.
224  */
225 static bool
227 {
228  if (node == NULL)
229  return false;
230 
231  if (IsA(node, Var))
232  {
233  Var *var = (Var *) node;
234  AttrNumber attnum = var->varattno;
235 
236  /*
237  * If pubviaroot is true, we are validating the row filter of the
238  * parent table, but the bitmap contains the replica identity
239  * information of the child table. So, get the column number of the
240  * child table as parent and child column order could be different.
241  */
242  if (context->pubviaroot)
243  {
244  char *colname = get_attname(context->parentid, attnum, false);
245 
246  attnum = get_attnum(context->relid, colname);
247  }
248 
250  context->bms_replident))
251  return true;
252  }
253 
255  (void *) context);
256 }
257 
258 /*
259  * Check if all columns referenced in the filter expression are part of the
260  * REPLICA IDENTITY index or not.
261  *
262  * Returns true if any invalid column is found.
263  */
264 bool
265 pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
266  bool pubviaroot)
267 {
268  HeapTuple rftuple;
269  Oid relid = RelationGetRelid(relation);
270  Oid publish_as_relid = RelationGetRelid(relation);
271  bool result = false;
272  Datum rfdatum;
273  bool rfisnull;
274 
275  /*
276  * FULL means all columns are in the REPLICA IDENTITY, so all columns are
277  * allowed in the row filter and we can skip the validation.
278  */
279  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
280  return false;
281 
282  /*
283  * For a partition, if pubviaroot is true, find the topmost ancestor that
284  * is published via this publication as we need to use its row filter
285  * expression to filter the partition's changes.
286  *
287  * Note that even though the row filter used is for an ancestor, the
288  * REPLICA IDENTITY used will be for the actual child table.
289  */
290  if (pubviaroot && relation->rd_rel->relispartition)
291  {
292  publish_as_relid
293  = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
294 
295  if (!OidIsValid(publish_as_relid))
296  publish_as_relid = relid;
297  }
298 
300  ObjectIdGetDatum(publish_as_relid),
301  ObjectIdGetDatum(pubid));
302 
303  if (!HeapTupleIsValid(rftuple))
304  return false;
305 
306  rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
307  Anum_pg_publication_rel_prqual,
308  &rfisnull);
309 
310  if (!rfisnull)
311  {
312  rf_context context = {0};
313  Node *rfnode;
314  Bitmapset *bms = NULL;
315 
316  context.pubviaroot = pubviaroot;
317  context.parentid = publish_as_relid;
318  context.relid = relid;
319 
320  /* Remember columns that are part of the REPLICA IDENTITY */
321  bms = RelationGetIndexAttrBitmap(relation,
323 
324  context.bms_replident = bms;
325  rfnode = stringToNode(TextDatumGetCString(rfdatum));
326  result = contain_invalid_rfcolumn_walker(rfnode, &context);
327  }
328 
329  ReleaseSysCache(rftuple);
330 
331  return result;
332 }
333 
334 /*
335  * Check if all columns referenced in the REPLICA IDENTITY are covered by
336  * the column list.
337  *
338  * Returns true if any replica identity column is not covered by column list.
339  */
340 bool
342  bool pubviaroot)
343 {
344  HeapTuple tuple;
345  Oid relid = RelationGetRelid(relation);
346  Oid publish_as_relid = RelationGetRelid(relation);
347  bool result = false;
348  Datum datum;
349  bool isnull;
350 
351  /*
352  * For a partition, if pubviaroot is true, find the topmost ancestor that
353  * is published via this publication as we need to use its column list for
354  * the changes.
355  *
356  * Note that even though the column list used is for an ancestor, the
357  * REPLICA IDENTITY used will be for the actual child table.
358  */
359  if (pubviaroot && relation->rd_rel->relispartition)
360  {
361  publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
362 
363  if (!OidIsValid(publish_as_relid))
364  publish_as_relid = relid;
365  }
366 
368  ObjectIdGetDatum(publish_as_relid),
369  ObjectIdGetDatum(pubid));
370 
371  if (!HeapTupleIsValid(tuple))
372  return false;
373 
374  datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
375  Anum_pg_publication_rel_prattrs,
376  &isnull);
377 
378  if (!isnull)
379  {
380  int x;
381  Bitmapset *idattrs;
382  Bitmapset *columns = NULL;
383 
384  /* With REPLICA IDENTITY FULL, no column list is allowed. */
385  if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
386  result = true;
387 
388  /* Transform the column list datum to a bitmapset. */
389  columns = pub_collist_to_bitmapset(NULL, datum, NULL);
390 
391  /* Remember columns that are part of the REPLICA IDENTITY */
392  idattrs = RelationGetIndexAttrBitmap(relation,
394 
395  /*
396  * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
397  * offset (to handle system columns the usual way), while column list
398  * does not use offset, so we can't do bms_is_subset(). Instead, we
399  * have to loop over the idattrs and check all of them are in the
400  * list.
401  */
402  x = -1;
403  while ((x = bms_next_member(idattrs, x)) >= 0)
404  {
406 
407  /*
408  * If pubviaroot is true, we are validating the column list of the
409  * parent table, but the bitmap contains the replica identity
410  * information of the child table. The parent/child attnums may
411  * not match, so translate them to the parent - get the attname
412  * from the child, and look it up in the parent.
413  */
414  if (pubviaroot)
415  {
416  /* attribute name in the child table */
417  char *colname = get_attname(relid, attnum, false);
418 
419  /*
420  * Determine the attnum for the attribute name in parent (we
421  * are using the column list defined on the parent).
422  */
423  attnum = get_attnum(publish_as_relid, colname);
424  }
425 
426  /* replica identity column, not covered by the column list */
427  if (!bms_is_member(attnum, columns))
428  {
429  result = true;
430  break;
431  }
432  }
433 
434  bms_free(idattrs);
435  bms_free(columns);
436  }
437 
438  ReleaseSysCache(tuple);
439 
440  return result;
441 }
442 
443 /* check_functions_in_node callback */
444 static bool
446 {
447  return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
448  func_id >= FirstNormalObjectId);
449 }
450 
451 /*
452  * The row filter walker checks if the row filter expression is a "simple
453  * expression".
454  *
455  * It allows only simple or compound expressions such as:
456  * - (Var Op Const)
457  * - (Var Op Var)
458  * - (Var Op Const) AND/OR (Var Op Const)
459  * - etc
460  * (where Var is a column of the table this filter belongs to)
461  *
462  * The simple expression has the following restrictions:
463  * - User-defined operators are not allowed;
464  * - User-defined functions are not allowed;
465  * - User-defined types are not allowed;
466  * - User-defined collations are not allowed;
467  * - Non-immutable built-in functions are not allowed;
468  * - System columns are not allowed.
469  *
470  * NOTES
471  *
472  * We don't allow user-defined functions/operators/types/collations because
473  * (a) if a user drops a user-defined object used in a row filter expression or
474  * if there is any other error while using it, the logical decoding
475  * infrastructure won't be able to recover from such an error even if the
476  * object is recreated again because a historic snapshot is used to evaluate
477  * the row filter;
478  * (b) a user-defined function can be used to access tables that could have
479  * unpleasant results because a historic snapshot is used. That's why only
480  * immutable built-in functions are allowed in row filter expressions.
481  *
482  * We don't allow system columns because currently, we don't have that
483  * information in the tuple passed to downstream. Also, as we don't replicate
484  * those to subscribers, there doesn't seem to be a need for a filter on those
485  * columns.
486  *
487  * We can allow other node types after more analysis and testing.
488  */
489 static bool
491 {
492  char *errdetail_msg = NULL;
493 
494  if (node == NULL)
495  return false;
496 
497  switch (nodeTag(node))
498  {
499  case T_Var:
500  /* System columns are not allowed. */
501  if (((Var *) node)->varattno < InvalidAttrNumber)
502  errdetail_msg = _("System columns are not allowed.");
503  break;
504  case T_OpExpr:
505  case T_DistinctExpr:
506  case T_NullIfExpr:
507  /* OK, except user-defined operators are not allowed. */
508  if (((OpExpr *) node)->opno >= FirstNormalObjectId)
509  errdetail_msg = _("User-defined operators are not allowed.");
510  break;
511  case T_ScalarArrayOpExpr:
512  /* OK, except user-defined operators are not allowed. */
513  if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
514  errdetail_msg = _("User-defined operators are not allowed.");
515 
516  /*
517  * We don't need to check the hashfuncid and negfuncid of
518  * ScalarArrayOpExpr as those functions are only built for a
519  * subquery.
520  */
521  break;
522  case T_RowCompareExpr:
523  {
524  ListCell *opid;
525 
526  /* OK, except user-defined operators are not allowed. */
527  foreach(opid, ((RowCompareExpr *) node)->opnos)
528  {
529  if (lfirst_oid(opid) >= FirstNormalObjectId)
530  {
531  errdetail_msg = _("User-defined operators are not allowed.");
532  break;
533  }
534  }
535  }
536  break;
537  case T_Const:
538  case T_FuncExpr:
539  case T_BoolExpr:
540  case T_RelabelType:
541  case T_CollateExpr:
542  case T_CaseExpr:
543  case T_CaseTestExpr:
544  case T_ArrayExpr:
545  case T_RowExpr:
546  case T_CoalesceExpr:
547  case T_MinMaxExpr:
548  case T_XmlExpr:
549  case T_NullTest:
550  case T_BooleanTest:
551  case T_List:
552  /* OK, supported */
553  break;
554  default:
555  errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
556  break;
557  }
558 
559  /*
560  * For all the supported nodes, if we haven't already found a problem,
561  * check the types, functions, and collations used in it. We check List
562  * by walking through each element.
563  */
564  if (!errdetail_msg && !IsA(node, List))
565  {
566  if (exprType(node) >= FirstNormalObjectId)
567  errdetail_msg = _("User-defined types are not allowed.");
569  (void *) pstate))
570  errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
571  else if (exprCollation(node) >= FirstNormalObjectId ||
573  errdetail_msg = _("User-defined collations are not allowed.");
574  }
575 
576  /*
577  * If we found a problem in this node, throw error now. Otherwise keep
578  * going.
579  */
580  if (errdetail_msg)
581  ereport(ERROR,
582  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
583  errmsg("invalid publication WHERE expression"),
584  errdetail_internal("%s", errdetail_msg),
585  parser_errposition(pstate, exprLocation(node))));
586 
588  (void *) pstate);
589 }
590 
591 /*
592  * Check if the row filter expression is a "simple expression".
593  *
594  * See check_simple_rowfilter_expr_walker for details.
595  */
596 static bool
598 {
599  return check_simple_rowfilter_expr_walker(node, pstate);
600 }
601 
602 /*
603  * Transform the publication WHERE expression for all the relations in the list,
604  * ensuring it is coerced to boolean and necessary collation information is
605  * added if required, and add a new nsitem/RTE for the associated relation to
606  * the ParseState's namespace list.
607  *
608  * Also check the publication row filter expression and throw an error if
609  * anything not permitted or unexpected is encountered.
610  */
611 static void
612 TransformPubWhereClauses(List *tables, const char *queryString,
613  bool pubviaroot)
614 {
615  ListCell *lc;
616 
617  foreach(lc, tables)
618  {
619  ParseNamespaceItem *nsitem;
620  Node *whereclause = NULL;
621  ParseState *pstate;
623 
624  if (pri->whereClause == NULL)
625  continue;
626 
627  /*
628  * If the publication doesn't publish changes via the root partitioned
629  * table, the partition's row filter will be used. So disallow using
630  * WHERE clause on partitioned table in this case.
631  */
632  if (!pubviaroot &&
633  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
634  ereport(ERROR,
635  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
636  errmsg("cannot use publication WHERE clause for relation \"%s\"",
638  errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
639  "publish_via_partition_root")));
640 
641  /*
642  * A fresh pstate is required so that we only have "this" table in its
643  * rangetable
644  */
645  pstate = make_parsestate(NULL);
646  pstate->p_sourcetext = queryString;
647  nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
648  AccessShareLock, NULL,
649  false, false);
650  addNSItemToQuery(pstate, nsitem, false, true, true);
651 
652  whereclause = transformWhereClause(pstate,
653  copyObject(pri->whereClause),
655  "PUBLICATION WHERE");
656 
657  /* Fix up collation information */
658  assign_expr_collations(pstate, whereclause);
659 
660  /*
661  * We allow only simple expressions in row filters. See
662  * check_simple_rowfilter_expr_walker.
663  */
664  check_simple_rowfilter_expr(whereclause, pstate);
665 
666  free_parsestate(pstate);
667 
668  pri->whereClause = whereclause;
669  }
670 }
671 
672 
673 /*
674  * Given a list of tables that are going to be added to a publication,
675  * verify that they fulfill the necessary preconditions, namely: no tables
676  * have a column list if any schema is published; and partitioned tables do
677  * not have column lists if publish_via_partition_root is not set.
678  *
679  * 'publish_schema' indicates that the publication contains any TABLES IN
680  * SCHEMA elements (newly added in this command, or preexisting).
681  * 'pubviaroot' is the value of publish_via_partition_root.
682  */
683 static void
684 CheckPubRelationColumnList(char *pubname, List *tables,
685  bool publish_schema, bool pubviaroot)
686 {
687  ListCell *lc;
688 
689  foreach(lc, tables)
690  {
692 
693  if (pri->columns == NIL)
694  continue;
695 
696  /*
697  * Disallow specifying column list if any schema is in the
698  * publication.
699  *
700  * XXX We could instead just forbid the case when the publication
701  * tries to publish the table with a column list and a schema for that
702  * table. However, if we do that then we need a restriction during
703  * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
704  * seem to be a good idea.
705  */
706  if (publish_schema)
707  ereport(ERROR,
708  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
709  errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
711  RelationGetRelationName(pri->relation), pubname),
712  errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
713 
714  /*
715  * If the publication doesn't publish changes via the root partitioned
716  * table, the partition's column list will be used. So disallow using
717  * a column list on the partitioned table in this case.
718  */
719  if (!pubviaroot &&
720  pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
721  ereport(ERROR,
722  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
723  errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
725  RelationGetRelationName(pri->relation), pubname),
726  errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
727  "publish_via_partition_root")));
728  }
729 }
730 
731 /*
732  * Create new publication.
733  */
736 {
737  Relation rel;
738  ObjectAddress myself;
739  Oid puboid;
740  bool nulls[Natts_pg_publication];
741  Datum values[Natts_pg_publication];
742  HeapTuple tup;
743  bool publish_given;
744  PublicationActions pubactions;
745  bool publish_via_partition_root_given;
746  bool publish_via_partition_root;
747  AclResult aclresult;
748  List *relations = NIL;
749  List *schemaidlist = NIL;
750 
751  /* must have CREATE privilege on database */
752  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
753  if (aclresult != ACLCHECK_OK)
754  aclcheck_error(aclresult, OBJECT_DATABASE,
756 
757  /* FOR ALL TABLES requires superuser */
758  if (stmt->for_all_tables && !superuser())
759  ereport(ERROR,
760  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
761  errmsg("must be superuser to create FOR ALL TABLES publication")));
762 
763  rel = table_open(PublicationRelationId, RowExclusiveLock);
764 
765  /* Check if name is used */
766  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
767  CStringGetDatum(stmt->pubname));
768  if (OidIsValid(puboid))
769  ereport(ERROR,
771  errmsg("publication \"%s\" already exists",
772  stmt->pubname)));
773 
774  /* Form a tuple. */
775  memset(values, 0, sizeof(values));
776  memset(nulls, false, sizeof(nulls));
777 
778  values[Anum_pg_publication_pubname - 1] =
780  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
781 
783  stmt->options,
784  &publish_given, &pubactions,
785  &publish_via_partition_root_given,
786  &publish_via_partition_root);
787 
788  puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
789  Anum_pg_publication_oid);
790  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
791  values[Anum_pg_publication_puballtables - 1] =
792  BoolGetDatum(stmt->for_all_tables);
793  values[Anum_pg_publication_pubinsert - 1] =
794  BoolGetDatum(pubactions.pubinsert);
795  values[Anum_pg_publication_pubupdate - 1] =
796  BoolGetDatum(pubactions.pubupdate);
797  values[Anum_pg_publication_pubdelete - 1] =
798  BoolGetDatum(pubactions.pubdelete);
799  values[Anum_pg_publication_pubtruncate - 1] =
800  BoolGetDatum(pubactions.pubtruncate);
801  values[Anum_pg_publication_pubviaroot - 1] =
802  BoolGetDatum(publish_via_partition_root);
803 
804  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
805 
806  /* Insert tuple into catalog. */
807  CatalogTupleInsert(rel, tup);
808  heap_freetuple(tup);
809 
810  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
811 
812  ObjectAddressSet(myself, PublicationRelationId, puboid);
813 
814  /* Make the changes visible. */
816 
817  /* Associate objects with the publication. */
818  if (stmt->for_all_tables)
819  {
820  /* Invalidate relcache so that publication info is rebuilt. */
822  }
823  else
824  {
825  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
826  &schemaidlist);
827 
828  /* FOR TABLES IN SCHEMA requires superuser */
829  if (schemaidlist != NIL && !superuser())
830  ereport(ERROR,
831  errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
832  errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
833 
834  if (relations != NIL)
835  {
836  List *rels;
837 
838  rels = OpenTableList(relations);
840  publish_via_partition_root);
841 
842  CheckPubRelationColumnList(stmt->pubname, rels,
843  schemaidlist != NIL,
844  publish_via_partition_root);
845 
846  PublicationAddTables(puboid, rels, true, NULL);
847  CloseTableList(rels);
848  }
849 
850  if (schemaidlist != NIL)
851  {
852  /*
853  * Schema lock is held until the publication is created to prevent
854  * concurrent schema deletion.
855  */
856  LockSchemaList(schemaidlist);
857  PublicationAddSchemas(puboid, schemaidlist, true, NULL);
858  }
859  }
860 
862 
863  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
864 
867  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
868  errmsg("wal_level is insufficient to publish logical changes"),
869  errhint("Set wal_level to \"logical\" before creating subscriptions.")));
870 
871  return myself;
872 }
873 
874 /*
875  * Change options of a publication.
876  */
877 static void
879  Relation rel, HeapTuple tup)
880 {
881  bool nulls[Natts_pg_publication];
882  bool replaces[Natts_pg_publication];
883  Datum values[Natts_pg_publication];
884  bool publish_given;
885  PublicationActions pubactions;
886  bool publish_via_partition_root_given;
887  bool publish_via_partition_root;
888  ObjectAddress obj;
889  Form_pg_publication pubform;
890  List *root_relids = NIL;
891  ListCell *lc;
892 
894  stmt->options,
895  &publish_given, &pubactions,
896  &publish_via_partition_root_given,
897  &publish_via_partition_root);
898 
899  pubform = (Form_pg_publication) GETSTRUCT(tup);
900 
901  /*
902  * If the publication doesn't publish changes via the root partitioned
903  * table, the partition's row filter and column list will be used. So
904  * disallow using WHERE clause and column lists on partitioned table in
905  * this case.
906  */
907  if (!pubform->puballtables && publish_via_partition_root_given &&
908  !publish_via_partition_root)
909  {
910  /*
911  * Lock the publication so nobody else can do anything with it. This
912  * prevents concurrent alter to add partitioned table(s) with WHERE
913  * clause(s) and/or column lists which we don't allow when not
914  * publishing via root.
915  */
916  LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
918 
919  root_relids = GetPublicationRelations(pubform->oid,
921 
922  foreach(lc, root_relids)
923  {
924  Oid relid = lfirst_oid(lc);
925  HeapTuple rftuple;
926  char relkind;
927  char *relname;
928  bool has_rowfilter;
929  bool has_collist;
930 
931  /*
932  * Beware: we don't have lock on the relations, so cope silently
933  * with the cache lookups returning NULL.
934  */
935 
937  ObjectIdGetDatum(relid),
938  ObjectIdGetDatum(pubform->oid));
939  if (!HeapTupleIsValid(rftuple))
940  continue;
941  has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
942  has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
943  if (!has_rowfilter && !has_collist)
944  {
945  ReleaseSysCache(rftuple);
946  continue;
947  }
948 
949  relkind = get_rel_relkind(relid);
950  if (relkind != RELKIND_PARTITIONED_TABLE)
951  {
952  ReleaseSysCache(rftuple);
953  continue;
954  }
955  relname = get_rel_name(relid);
956  if (relname == NULL) /* table concurrently dropped */
957  {
958  ReleaseSysCache(rftuple);
959  continue;
960  }
961 
962  if (has_rowfilter)
963  ereport(ERROR,
964  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
965  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
966  "publish_via_partition_root",
967  stmt->pubname),
968  errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
969  relname, "publish_via_partition_root")));
970  Assert(has_collist);
971  ereport(ERROR,
972  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
973  errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
974  "publish_via_partition_root",
975  stmt->pubname),
976  errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
977  relname, "publish_via_partition_root")));
978  }
979  }
980 
981  /* Everything ok, form a new tuple. */
982  memset(values, 0, sizeof(values));
983  memset(nulls, false, sizeof(nulls));
984  memset(replaces, false, sizeof(replaces));
985 
986  if (publish_given)
987  {
988  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
989  replaces[Anum_pg_publication_pubinsert - 1] = true;
990 
991  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
992  replaces[Anum_pg_publication_pubupdate - 1] = true;
993 
994  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
995  replaces[Anum_pg_publication_pubdelete - 1] = true;
996 
997  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
998  replaces[Anum_pg_publication_pubtruncate - 1] = true;
999  }
1000 
1001  if (publish_via_partition_root_given)
1002  {
1003  values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1004  replaces[Anum_pg_publication_pubviaroot - 1] = true;
1005  }
1006 
1007  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1008  replaces);
1009 
1010  /* Update the catalog. */
1011  CatalogTupleUpdate(rel, &tup->t_self, tup);
1012 
1014 
1015  pubform = (Form_pg_publication) GETSTRUCT(tup);
1016 
1017  /* Invalidate the relcache. */
1018  if (pubform->puballtables)
1019  {
1021  }
1022  else
1023  {
1024  List *relids = NIL;
1025  List *schemarelids = NIL;
1026 
1027  /*
1028  * For any partitioned tables contained in the publication, we must
1029  * invalidate all partitions contained in the respective partition
1030  * trees, not just those explicitly mentioned in the publication.
1031  */
1032  if (root_relids == NIL)
1033  relids = GetPublicationRelations(pubform->oid,
1035  else
1036  {
1037  /*
1038  * We already got tables explicitly mentioned in the publication.
1039  * Now get all partitions for the partitioned table in the list.
1040  */
1041  foreach(lc, root_relids)
1042  relids = GetPubPartitionOptionRelations(relids,
1044  lfirst_oid(lc));
1045  }
1046 
1047  schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1049  relids = list_concat_unique_oid(relids, schemarelids);
1050 
1051  InvalidatePublicationRels(relids);
1052  }
1053 
1054  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1056  (Node *) stmt);
1057 
1058  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1059 }
1060 
1061 /*
1062  * Invalidate the relations.
1063  */
1064 void
1066 {
1067  /*
1068  * We don't want to send too many individual messages, at some point it's
1069  * cheaper to just reset whole relcache.
1070  */
1071  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1072  {
1073  ListCell *lc;
1074 
1075  foreach(lc, relids)
1077  }
1078  else
1080 }
1081 
1082 /*
1083  * Add or remove table to/from publication.
1084  */
1085 static void
1087  List *tables, const char *queryString,
1088  bool publish_schema)
1089 {
1090  List *rels = NIL;
1092  Oid pubid = pubform->oid;
1093 
1094  /*
1095  * Nothing to do if no objects, except in SET: for that it is quite
1096  * possible that user has not specified any tables in which case we need
1097  * to remove all the existing tables.
1098  */
1099  if (!tables && stmt->action != AP_SetObjects)
1100  return;
1101 
1102  rels = OpenTableList(tables);
1103 
1104  if (stmt->action == AP_AddObjects)
1105  {
1106  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1107 
1108  publish_schema |= is_schema_publication(pubid);
1109 
1110  CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1111  pubform->pubviaroot);
1112 
1113  PublicationAddTables(pubid, rels, false, stmt);
1114  }
1115  else if (stmt->action == AP_DropObjects)
1116  PublicationDropTables(pubid, rels, false);
1117  else /* AP_SetObjects */
1118  {
1119  List *oldrelids = GetPublicationRelations(pubid,
1121  List *delrels = NIL;
1122  ListCell *oldlc;
1123 
1124  TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1125 
1126  CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1127  pubform->pubviaroot);
1128 
1129  /*
1130  * To recreate the relation list for the publication, look for
1131  * existing relations that do not need to be dropped.
1132  */
1133  foreach(oldlc, oldrelids)
1134  {
1135  Oid oldrelid = lfirst_oid(oldlc);
1136  ListCell *newlc;
1137  PublicationRelInfo *oldrel;
1138  bool found = false;
1139  HeapTuple rftuple;
1140  Node *oldrelwhereclause = NULL;
1141  Bitmapset *oldcolumns = NULL;
1142 
1143  /* look up the cache for the old relmap */
1145  ObjectIdGetDatum(oldrelid),
1146  ObjectIdGetDatum(pubid));
1147 
1148  /*
1149  * See if the existing relation currently has a WHERE clause or a
1150  * column list. We need to compare those too.
1151  */
1152  if (HeapTupleIsValid(rftuple))
1153  {
1154  bool isnull = true;
1155  Datum whereClauseDatum;
1156  Datum columnListDatum;
1157 
1158  /* Load the WHERE clause for this table. */
1159  whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1160  Anum_pg_publication_rel_prqual,
1161  &isnull);
1162  if (!isnull)
1163  oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1164 
1165  /* Transform the int2vector column list to a bitmap. */
1166  columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1167  Anum_pg_publication_rel_prattrs,
1168  &isnull);
1169 
1170  if (!isnull)
1171  oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1172 
1173  ReleaseSysCache(rftuple);
1174  }
1175 
1176  foreach(newlc, rels)
1177  {
1178  PublicationRelInfo *newpubrel;
1179  Oid newrelid;
1180  Bitmapset *newcolumns = NULL;
1181 
1182  newpubrel = (PublicationRelInfo *) lfirst(newlc);
1183  newrelid = RelationGetRelid(newpubrel->relation);
1184 
1185  /*
1186  * If the new publication has column list, transform it to a
1187  * bitmap too.
1188  */
1189  if (newpubrel->columns)
1190  {
1191  ListCell *lc;
1192 
1193  foreach(lc, newpubrel->columns)
1194  {
1195  char *colname = strVal(lfirst(lc));
1196  AttrNumber attnum = get_attnum(newrelid, colname);
1197 
1198  newcolumns = bms_add_member(newcolumns, attnum);
1199  }
1200  }
1201 
1202  /*
1203  * Check if any of the new set of relations matches with the
1204  * existing relations in the publication. Additionally, if the
1205  * relation has an associated WHERE clause, check the WHERE
1206  * expressions also match. Same for the column list. Drop the
1207  * rest.
1208  */
1209  if (RelationGetRelid(newpubrel->relation) == oldrelid)
1210  {
1211  if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1212  bms_equal(oldcolumns, newcolumns))
1213  {
1214  found = true;
1215  break;
1216  }
1217  }
1218  }
1219 
1220  /*
1221  * Add the non-matched relations to a list so that they can be
1222  * dropped.
1223  */
1224  if (!found)
1225  {
1226  oldrel = palloc(sizeof(PublicationRelInfo));
1227  oldrel->whereClause = NULL;
1228  oldrel->columns = NIL;
1229  oldrel->relation = table_open(oldrelid,
1231  delrels = lappend(delrels, oldrel);
1232  }
1233  }
1234 
1235  /* And drop them. */
1236  PublicationDropTables(pubid, delrels, true);
1237 
1238  /*
1239  * Don't bother calculating the difference for adding, we'll catch and
1240  * skip existing ones when doing catalog update.
1241  */
1242  PublicationAddTables(pubid, rels, true, stmt);
1243 
1244  CloseTableList(delrels);
1245  }
1246 
1247  CloseTableList(rels);
1248 }
1249 
1250 /*
1251  * Alter the publication schemas.
1252  *
1253  * Add or remove schemas to/from publication.
1254  */
1255 static void
1257  HeapTuple tup, List *schemaidlist)
1258 {
1260 
1261  /*
1262  * Nothing to do if no objects, except in SET: for that it is quite
1263  * possible that user has not specified any schemas in which case we need
1264  * to remove all the existing schemas.
1265  */
1266  if (!schemaidlist && stmt->action != AP_SetObjects)
1267  return;
1268 
1269  /*
1270  * Schema lock is held until the publication is altered to prevent
1271  * concurrent schema deletion.
1272  */
1273  LockSchemaList(schemaidlist);
1274  if (stmt->action == AP_AddObjects)
1275  {
1276  ListCell *lc;
1277  List *reloids;
1278 
1279  reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1280 
1281  foreach(lc, reloids)
1282  {
1283  HeapTuple coltuple;
1284 
1287  ObjectIdGetDatum(pubform->oid));
1288 
1289  if (!HeapTupleIsValid(coltuple))
1290  continue;
1291 
1292  /*
1293  * Disallow adding schema if column list is already part of the
1294  * publication. See CheckPubRelationColumnList.
1295  */
1296  if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1297  ereport(ERROR,
1298  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1299  errmsg("cannot add schema to publication \"%s\"",
1300  stmt->pubname),
1301  errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1302 
1303  ReleaseSysCache(coltuple);
1304  }
1305 
1306  PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1307  }
1308  else if (stmt->action == AP_DropObjects)
1309  PublicationDropSchemas(pubform->oid, schemaidlist, false);
1310  else /* AP_SetObjects */
1311  {
1312  List *oldschemaids = GetPublicationSchemas(pubform->oid);
1313  List *delschemas = NIL;
1314 
1315  /* Identify which schemas should be dropped */
1316  delschemas = list_difference_oid(oldschemaids, schemaidlist);
1317 
1318  /*
1319  * Schema lock is held until the publication is altered to prevent
1320  * concurrent schema deletion.
1321  */
1322  LockSchemaList(delschemas);
1323 
1324  /* And drop them */
1325  PublicationDropSchemas(pubform->oid, delschemas, true);
1326 
1327  /*
1328  * Don't bother calculating the difference for adding, we'll catch and
1329  * skip existing ones when doing catalog update.
1330  */
1331  PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1332  }
1333 }
1334 
1335 /*
1336  * Check if relations and schemas can be in a given publication and throw
1337  * appropriate error if not.
1338  */
1339 static void
1341  List *tables, List *schemaidlist)
1342 {
1344 
1345  if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1346  schemaidlist && !superuser())
1347  ereport(ERROR,
1348  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1349  errmsg("must be superuser to add or set schemas")));
1350 
1351  /*
1352  * Check that user is allowed to manipulate the publication tables in
1353  * schema
1354  */
1355  if (schemaidlist && pubform->puballtables)
1356  ereport(ERROR,
1357  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1358  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1359  NameStr(pubform->pubname)),
1360  errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1361 
1362  /* Check that user is allowed to manipulate the publication tables. */
1363  if (tables && pubform->puballtables)
1364  ereport(ERROR,
1365  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1366  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1367  NameStr(pubform->pubname)),
1368  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1369 }
1370 
1371 /*
1372  * Alter the existing publication.
1373  *
1374  * This is dispatcher function for AlterPublicationOptions,
1375  * AlterPublicationSchemas and AlterPublicationTables.
1376  */
1377 void
1379 {
1380  Relation rel;
1381  HeapTuple tup;
1382  Form_pg_publication pubform;
1383 
1384  rel = table_open(PublicationRelationId, RowExclusiveLock);
1385 
1387  CStringGetDatum(stmt->pubname));
1388 
1389  if (!HeapTupleIsValid(tup))
1390  ereport(ERROR,
1391  (errcode(ERRCODE_UNDEFINED_OBJECT),
1392  errmsg("publication \"%s\" does not exist",
1393  stmt->pubname)));
1394 
1395  pubform = (Form_pg_publication) GETSTRUCT(tup);
1396 
1397  /* must be owner */
1398  if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1400  stmt->pubname);
1401 
1402  if (stmt->options)
1403  AlterPublicationOptions(pstate, stmt, rel, tup);
1404  else
1405  {
1406  List *relations = NIL;
1407  List *schemaidlist = NIL;
1408  Oid pubid = pubform->oid;
1409 
1410  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1411  &schemaidlist);
1412 
1413  CheckAlterPublication(stmt, tup, relations, schemaidlist);
1414 
1415  heap_freetuple(tup);
1416 
1417  /* Lock the publication so nobody else can do anything with it. */
1418  LockDatabaseObject(PublicationRelationId, pubid, 0,
1420 
1421  /*
1422  * It is possible that by the time we acquire the lock on publication,
1423  * concurrent DDL has removed it. We can test this by checking the
1424  * existence of publication. We get the tuple again to avoid the risk
1425  * of any publication option getting changed.
1426  */
1428  if (!HeapTupleIsValid(tup))
1429  ereport(ERROR,
1430  errcode(ERRCODE_UNDEFINED_OBJECT),
1431  errmsg("publication \"%s\" does not exist",
1432  stmt->pubname));
1433 
1434  AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1435  schemaidlist != NIL);
1436  AlterPublicationSchemas(stmt, tup, schemaidlist);
1437  }
1438 
1439  /* Cleanup. */
1440  heap_freetuple(tup);
1442 }
1443 
1444 /*
1445  * Remove relation from publication by mapping OID.
1446  */
1447 void
1449 {
1450  Relation rel;
1451  HeapTuple tup;
1452  Form_pg_publication_rel pubrel;
1453  List *relids = NIL;
1454 
1455  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1456 
1458 
1459  if (!HeapTupleIsValid(tup))
1460  elog(ERROR, "cache lookup failed for publication table %u",
1461  proid);
1462 
1463  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1464 
1465  /*
1466  * Invalidate relcache so that publication info is rebuilt.
1467  *
1468  * For the partitioned tables, we must invalidate all partitions contained
1469  * in the respective partition hierarchies, not just the one explicitly
1470  * mentioned in the publication. This is required because we implicitly
1471  * publish the child tables when the parent table is published.
1472  */
1474  pubrel->prrelid);
1475 
1476  InvalidatePublicationRels(relids);
1477 
1478  CatalogTupleDelete(rel, &tup->t_self);
1479 
1480  ReleaseSysCache(tup);
1481 
1483 }
1484 
1485 /*
1486  * Remove the publication by mapping OID.
1487  */
1488 void
1490 {
1491  Relation rel;
1492  HeapTuple tup;
1493  Form_pg_publication pubform;
1494 
1495  rel = table_open(PublicationRelationId, RowExclusiveLock);
1496 
1498  if (!HeapTupleIsValid(tup))
1499  elog(ERROR, "cache lookup failed for publication %u", pubid);
1500 
1501  pubform = (Form_pg_publication) GETSTRUCT(tup);
1502 
1503  /* Invalidate relcache so that publication info is rebuilt. */
1504  if (pubform->puballtables)
1506 
1507  CatalogTupleDelete(rel, &tup->t_self);
1508 
1509  ReleaseSysCache(tup);
1510 
1512 }
1513 
1514 /*
1515  * Remove schema from publication by mapping OID.
1516  */
1517 void
1519 {
1520  Relation rel;
1521  HeapTuple tup;
1522  List *schemaRels = NIL;
1524 
1525  rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1526 
1528 
1529  if (!HeapTupleIsValid(tup))
1530  elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1531 
1532  pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1533 
1534  /*
1535  * Invalidate relcache so that publication info is rebuilt. See
1536  * RemovePublicationRelById for why we need to consider all the
1537  * partitions.
1538  */
1539  schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1541  InvalidatePublicationRels(schemaRels);
1542 
1543  CatalogTupleDelete(rel, &tup->t_self);
1544 
1545  ReleaseSysCache(tup);
1546 
1548 }
1549 
1550 /*
1551  * Open relations specified by a PublicationTable list.
1552  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1553  * add them to a publication.
1554  */
1555 static List *
1557 {
1558  List *relids = NIL;
1559  List *rels = NIL;
1560  ListCell *lc;
1561  List *relids_with_rf = NIL;
1562  List *relids_with_collist = NIL;
1563 
1564  /*
1565  * Open, share-lock, and check all the explicitly-specified relations
1566  */
1567  foreach(lc, tables)
1568  {
1570  bool recurse = t->relation->inh;
1571  Relation rel;
1572  Oid myrelid;
1573  PublicationRelInfo *pub_rel;
1574 
1575  /* Allow query cancel in case this takes a long time */
1577 
1579  myrelid = RelationGetRelid(rel);
1580 
1581  /*
1582  * Filter out duplicates if user specifies "foo, foo".
1583  *
1584  * Note that this algorithm is known to not be very efficient (O(N^2))
1585  * but given that it only works on list of tables given to us by user
1586  * it's deemed acceptable.
1587  */
1588  if (list_member_oid(relids, myrelid))
1589  {
1590  /* Disallow duplicate tables if there are any with row filters. */
1591  if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1592  ereport(ERROR,
1594  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1595  RelationGetRelationName(rel))));
1596 
1597  /* Disallow duplicate tables if there are any with column lists. */
1598  if (t->columns || list_member_oid(relids_with_collist, myrelid))
1599  ereport(ERROR,
1601  errmsg("conflicting or redundant column lists for table \"%s\"",
1602  RelationGetRelationName(rel))));
1603 
1605  continue;
1606  }
1607 
1608  pub_rel = palloc(sizeof(PublicationRelInfo));
1609  pub_rel->relation = rel;
1610  pub_rel->whereClause = t->whereClause;
1611  pub_rel->columns = t->columns;
1612  rels = lappend(rels, pub_rel);
1613  relids = lappend_oid(relids, myrelid);
1614 
1615  if (t->whereClause)
1616  relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1617 
1618  if (t->columns)
1619  relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1620 
1621  /*
1622  * Add children of this rel, if requested, so that they too are added
1623  * to the publication. A partitioned table can't have any inheritance
1624  * children other than its partitions, which need not be explicitly
1625  * added to the publication.
1626  */
1627  if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1628  {
1629  List *children;
1630  ListCell *child;
1631 
1632  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1633  NULL);
1634 
1635  foreach(child, children)
1636  {
1637  Oid childrelid = lfirst_oid(child);
1638 
1639  /* Allow query cancel in case this takes a long time */
1641 
1642  /*
1643  * Skip duplicates if user specified both parent and child
1644  * tables.
1645  */
1646  if (list_member_oid(relids, childrelid))
1647  {
1648  /*
1649  * We don't allow to specify row filter for both parent
1650  * and child table at the same time as it is not very
1651  * clear which one should be given preference.
1652  */
1653  if (childrelid != myrelid &&
1654  (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1655  ereport(ERROR,
1657  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1658  RelationGetRelationName(rel))));
1659 
1660  /*
1661  * We don't allow to specify column list for both parent
1662  * and child table at the same time as it is not very
1663  * clear which one should be given preference.
1664  */
1665  if (childrelid != myrelid &&
1666  (t->columns || list_member_oid(relids_with_collist, childrelid)))
1667  ereport(ERROR,
1669  errmsg("conflicting or redundant column lists for table \"%s\"",
1670  RelationGetRelationName(rel))));
1671 
1672  continue;
1673  }
1674 
1675  /* find_all_inheritors already got lock */
1676  rel = table_open(childrelid, NoLock);
1677  pub_rel = palloc(sizeof(PublicationRelInfo));
1678  pub_rel->relation = rel;
1679  /* child inherits WHERE clause from parent */
1680  pub_rel->whereClause = t->whereClause;
1681 
1682  /* child inherits column list from parent */
1683  pub_rel->columns = t->columns;
1684  rels = lappend(rels, pub_rel);
1685  relids = lappend_oid(relids, childrelid);
1686 
1687  if (t->whereClause)
1688  relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1689 
1690  if (t->columns)
1691  relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1692  }
1693  }
1694  }
1695 
1696  list_free(relids);
1697  list_free(relids_with_rf);
1698 
1699  return rels;
1700 }
1701 
1702 /*
1703  * Close all relations in the list.
1704  */
1705 static void
1707 {
1708  ListCell *lc;
1709 
1710  foreach(lc, rels)
1711  {
1712  PublicationRelInfo *pub_rel;
1713 
1714  pub_rel = (PublicationRelInfo *) lfirst(lc);
1715  table_close(pub_rel->relation, NoLock);
1716  }
1717 
1718  list_free_deep(rels);
1719 }
1720 
1721 /*
1722  * Lock the schemas specified in the schema list in AccessShareLock mode in
1723  * order to prevent concurrent schema deletion.
1724  */
1725 static void
1726 LockSchemaList(List *schemalist)
1727 {
1728  ListCell *lc;
1729 
1730  foreach(lc, schemalist)
1731  {
1732  Oid schemaid = lfirst_oid(lc);
1733 
1734  /* Allow query cancel in case this takes a long time */
1736  LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1737 
1738  /*
1739  * It is possible that by the time we acquire the lock on schema,
1740  * concurrent DDL has removed it. We can test this by checking the
1741  * existence of schema.
1742  */
1744  ereport(ERROR,
1745  errcode(ERRCODE_UNDEFINED_SCHEMA),
1746  errmsg("schema with OID %u does not exist", schemaid));
1747  }
1748 }
1749 
1750 /*
1751  * Add listed tables to the publication.
1752  */
1753 static void
1754 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1756 {
1757  ListCell *lc;
1758 
1759  Assert(!stmt || !stmt->for_all_tables);
1760 
1761  foreach(lc, rels)
1762  {
1763  PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1764  Relation rel = pub_rel->relation;
1765  ObjectAddress obj;
1766 
1767  /* Must be owner of the table or superuser. */
1768  if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1771 
1772  obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1773  if (stmt)
1774  {
1776  (Node *) stmt);
1777 
1778  InvokeObjectPostCreateHook(PublicationRelRelationId,
1779  obj.objectId, 0);
1780  }
1781  }
1782 }
1783 
1784 /*
1785  * Remove listed tables from the publication.
1786  */
1787 static void
1788 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1789 {
1790  ObjectAddress obj;
1791  ListCell *lc;
1792  Oid prid;
1793 
1794  foreach(lc, rels)
1795  {
1796  PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1797  Relation rel = pubrel->relation;
1798  Oid relid = RelationGetRelid(rel);
1799 
1800  if (pubrel->columns)
1801  ereport(ERROR,
1802  errcode(ERRCODE_SYNTAX_ERROR),
1803  errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1804 
1805  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1806  ObjectIdGetDatum(relid),
1807  ObjectIdGetDatum(pubid));
1808  if (!OidIsValid(prid))
1809  {
1810  if (missing_ok)
1811  continue;
1812 
1813  ereport(ERROR,
1814  (errcode(ERRCODE_UNDEFINED_OBJECT),
1815  errmsg("relation \"%s\" is not part of the publication",
1816  RelationGetRelationName(rel))));
1817  }
1818 
1819  if (pubrel->whereClause)
1820  ereport(ERROR,
1821  (errcode(ERRCODE_SYNTAX_ERROR),
1822  errmsg("cannot use a WHERE clause when removing a table from a publication")));
1823 
1824  ObjectAddressSet(obj, PublicationRelRelationId, prid);
1825  performDeletion(&obj, DROP_CASCADE, 0);
1826  }
1827 }
1828 
1829 /*
1830  * Add listed schemas to the publication.
1831  */
1832 static void
1833 PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1835 {
1836  ListCell *lc;
1837 
1838  Assert(!stmt || !stmt->for_all_tables);
1839 
1840  foreach(lc, schemas)
1841  {
1842  Oid schemaid = lfirst_oid(lc);
1843  ObjectAddress obj;
1844 
1845  obj = publication_add_schema(pubid, schemaid, if_not_exists);
1846  if (stmt)
1847  {
1849  (Node *) stmt);
1850 
1851  InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1852  obj.objectId, 0);
1853  }
1854  }
1855 }
1856 
1857 /*
1858  * Remove listed schemas from the publication.
1859  */
1860 static void
1861 PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1862 {
1863  ObjectAddress obj;
1864  ListCell *lc;
1865  Oid psid;
1866 
1867  foreach(lc, schemas)
1868  {
1869  Oid schemaid = lfirst_oid(lc);
1870 
1872  Anum_pg_publication_namespace_oid,
1873  ObjectIdGetDatum(schemaid),
1874  ObjectIdGetDatum(pubid));
1875  if (!OidIsValid(psid))
1876  {
1877  if (missing_ok)
1878  continue;
1879 
1880  ereport(ERROR,
1881  (errcode(ERRCODE_UNDEFINED_OBJECT),
1882  errmsg("tables from schema \"%s\" are not part of the publication",
1883  get_namespace_name(schemaid))));
1884  }
1885 
1886  ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1887  performDeletion(&obj, DROP_CASCADE, 0);
1888  }
1889 }
1890 
1891 /*
1892  * Internal workhorse for changing a publication owner
1893  */
1894 static void
1896 {
1897  Form_pg_publication form;
1898 
1899  form = (Form_pg_publication) GETSTRUCT(tup);
1900 
1901  if (form->pubowner == newOwnerId)
1902  return;
1903 
1904  if (!superuser())
1905  {
1906  AclResult aclresult;
1907 
1908  /* Must be owner */
1909  if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1911  NameStr(form->pubname));
1912 
1913  /* Must be able to become new owner */
1914  check_can_set_role(GetUserId(), newOwnerId);
1915 
1916  /* New owner must have CREATE privilege on database */
1917  aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
1918  if (aclresult != ACLCHECK_OK)
1919  aclcheck_error(aclresult, OBJECT_DATABASE,
1921 
1922  if (form->puballtables && !superuser_arg(newOwnerId))
1923  ereport(ERROR,
1924  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1925  errmsg("permission denied to change owner of publication \"%s\"",
1926  NameStr(form->pubname)),
1927  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1928 
1929  if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1930  ereport(ERROR,
1931  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1932  errmsg("permission denied to change owner of publication \"%s\"",
1933  NameStr(form->pubname)),
1934  errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1935  }
1936 
1937  form->pubowner = newOwnerId;
1938  CatalogTupleUpdate(rel, &tup->t_self, tup);
1939 
1940  /* Update owner dependency reference */
1941  changeDependencyOnOwner(PublicationRelationId,
1942  form->oid,
1943  newOwnerId);
1944 
1945  InvokeObjectPostAlterHook(PublicationRelationId,
1946  form->oid, 0);
1947 }
1948 
1949 /*
1950  * Change publication owner -- by name
1951  */
1953 AlterPublicationOwner(const char *name, Oid newOwnerId)
1954 {
1955  Oid subid;
1956  HeapTuple tup;
1957  Relation rel;
1958  ObjectAddress address;
1959  Form_pg_publication pubform;
1960 
1961  rel = table_open(PublicationRelationId, RowExclusiveLock);
1962 
1964 
1965  if (!HeapTupleIsValid(tup))
1966  ereport(ERROR,
1967  (errcode(ERRCODE_UNDEFINED_OBJECT),
1968  errmsg("publication \"%s\" does not exist", name)));
1969 
1970  pubform = (Form_pg_publication) GETSTRUCT(tup);
1971  subid = pubform->oid;
1972 
1973  AlterPublicationOwner_internal(rel, tup, newOwnerId);
1974 
1975  ObjectAddressSet(address, PublicationRelationId, subid);
1976 
1977  heap_freetuple(tup);
1978 
1980 
1981  return address;
1982 }
1983 
1984 /*
1985  * Change publication owner -- by OID
1986  */
1987 void
1989 {
1990  HeapTuple tup;
1991  Relation rel;
1992 
1993  rel = table_open(PublicationRelationId, RowExclusiveLock);
1994 
1996 
1997  if (!HeapTupleIsValid(tup))
1998  ereport(ERROR,
1999  (errcode(ERRCODE_UNDEFINED_OBJECT),
2000  errmsg("publication with OID %u does not exist", subid)));
2001 
2002  AlterPublicationOwner_internal(rel, tup, newOwnerId);
2003 
2004  heap_freetuple(tup);
2005 
2007 }
void check_can_set_role(Oid member, Oid role)
Definition: acl.c:5117
AclResult
Definition: acl.h:181
@ ACLCHECK_OK
Definition: acl.h:182
@ ACLCHECK_NOT_OWNER
Definition: acl.h:184
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2695
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3843
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4097
int16 AttrNumber
Definition: attnum.h:21
#define InvalidAttrNumber
Definition: attnum.h:23
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
Definition: bitmapset.c:97
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1106
void bms_free(Bitmapset *a)
Definition: bitmapset.c:194
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:460
Bitmapset * bms_add_member(Bitmapset *a, int x)
Definition: bitmapset.c:753
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define TextDatumGetCString(d)
Definition: builtins.h:95
#define NameStr(name)
Definition: c.h:735
#define OidIsValid(objectId)
Definition: c.h:764
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:393
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3089
bool defGetBoolean(DefElem *def)
Definition: define.c:108
char * defGetString(DefElem *def)
Definition: define.c:49
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:385
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:328
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1229
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define _(x)
Definition: elog.c:91
#define WARNING
Definition: elog.h:36
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
bool equal(const void *a, const void *b)
Definition: equalfuncs.c:223
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:642
Oid MyDatabaseId
Definition: globals.c:89
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
Definition: heaptuple.c:456
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define stmt
Definition: indent_codes.h:59
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1421
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1386
int x
Definition: isn.c:71
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
List * list_difference_oid(const List *list1, const List *list2)
Definition: list.c:1312
List * list_concat_unique_oid(List *list1, const List *list2)
Definition: list.c:1468
List * lappend(List *list, void *datum)
Definition: list.c:338
List * lappend_oid(List *list, Oid datum)
Definition: list.c:374
void list_free(List *list)
Definition: list.c:1545
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:721
List * list_append_unique_oid(List *list, Oid datum)
Definition: list.c:1379
void list_free_deep(List *list)
Definition: list.c:1559
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1005
#define NoLock
Definition: lockdefs.h:34
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define AccessShareLock
Definition: lockdefs.h:36
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define RowExclusiveLock
Definition: lockdefs.h:38
AttrNumber get_attnum(Oid relid, const char *attname)
Definition: lsyscache.c:857
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3348
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:2007
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1932
char func_volatile(Oid funcid)
Definition: lsyscache.c:1784
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition: lsyscache.c:826
void * palloc(Size size)
Definition: mcxt.c:1226
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
Oid GetUserId(void)
Definition: miscinit.c:508
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
List * fetch_search_path(bool includeImplicit)
Definition: namespace.c:4784
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Definition: namespace.c:3502
Oid exprType(const Node *expr)
Definition: nodeFuncs.c:43
Oid exprInputCollation(const Node *expr)
Definition: nodeFuncs.c:1018
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Definition: nodeFuncs.c:1817
Oid exprCollation(const Node *expr)
Definition: nodeFuncs.c:786
int exprLocation(const Node *expr)
Definition: nodeFuncs.c:1312
#define expression_tree_walker(n, w, c)
Definition: nodeFuncs.h:151
#define IsA(nodeptr, _type_)
Definition: nodes.h:179
#define copyObject(obj)
Definition: nodes.h:244
#define nodeTag(nodeptr)
Definition: nodes.h:133
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
Definition: parse_node.c:77
int parser_errposition(ParseState *pstate, int location)
Definition: parse_node.c:111
ParseState * make_parsestate(ParseState *parentParseState)
Definition: parse_node.c:44
@ EXPR_KIND_WHERE
Definition: parse_node.h:46
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
Definition: parsenodes.h:3991
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
Definition: parsenodes.h:3990
@ PUBLICATIONOBJ_TABLE
Definition: parsenodes.h:3989
@ AP_DropObjects
Definition: parsenodes.h:4017
@ AP_SetObjects
Definition: parsenodes.h:4018
@ AP_AddObjects
Definition: parsenodes.h:4016
@ DROP_CASCADE
Definition: parsenodes.h:2170
@ OBJECT_DATABASE
Definition: parsenodes.h:2105
@ OBJECT_PUBLICATION
Definition: parsenodes.h:2126
#define ACL_CREATE
Definition: parsenodes.h:85
int16 attnum
Definition: pg_attribute.h:74
NameData relname
Definition: pg_class.h:38
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
#define lfirst(lc)
Definition: pg_list.h:172
#define lfirst_node(type, lc)
Definition: pg_list.h:176
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define linitial_oid(l)
Definition: pg_list.h:180
#define lfirst_oid(lc)
Definition: pg_list.h:174
bool is_schema_publication(Oid pubid)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
@ PUBLICATION_PART_ROOT
@ PUBLICATION_PART_ALL
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:313
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:165
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
unsigned int Oid
Definition: postgres_ext.h:31
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
Definition: read.c:90
#define RelationGetRelid(relation)
Definition: rel.h:504
#define RelationGetDescr(relation)
Definition: rel.h:530
#define RelationGetRelationName(relation)
Definition: rel.h:538
#define RelationGetNamespace(relation)
Definition: rel.h:545
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:5231
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition: relcache.h:63
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
char * defname
Definition: parsenodes.h:802
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
Definition: nodes.h:129
const char * p_sourcetext
Definition: parse_node.h:192
PublicationObjSpecType pubobjtype
Definition: parsenodes.h:3999
PublicationTable * pubtable
Definition: parsenodes.h:4001
RangeVar * relation
Definition: parsenodes.h:3979
bool inh
Definition: primnodes.h:85
Form_pg_class rd_rel
Definition: rel.h:111
Definition: primnodes.h:234
AttrNumber varattno
Definition: primnodes.h:246
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
bool superuser(void)
Definition: superuser.c:46
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:868
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:820
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1081
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:831
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:182
@ PUBLICATIONREL
Definition: syscache.h:84
@ PUBLICATIONOID
Definition: syscache.h:83
@ PUBLICATIONNAME
Definition: syscache.h:80
@ PUBLICATIONNAMESPACE
Definition: syscache.h:81
@ PUBLICATIONNAMESPACEMAP
Definition: syscache.h:82
@ PUBLICATIONRELMAP
Definition: syscache.h:85
@ NAMESPACEOID
Definition: syscache.h:70
#define SearchSysCacheExists1(cacheId, key1)
Definition: syscache.h:191
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:200
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:202
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:83
#define FirstNormalObjectId
Definition: transam.h:197
#define strVal(v)
Definition: value.h:82
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3456
const char * name
void CommandCounterIncrement(void)
Definition: xact.c:1078
int wal_level
Definition: xlog.c:134
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:74