50 #include "utils/fmgroids.h"
87 bool *publish_via_partition_root_given,
88 bool *publish_via_partition_root)
92 *publish_given =
false;
93 *publish_via_partition_root_given =
false;
100 *publish_via_partition_root =
false;
107 if (strcmp(defel->
defname,
"publish") == 0)
125 *publish_given =
true;
130 (
errcode(ERRCODE_SYNTAX_ERROR),
131 errmsg(
"invalid list syntax in parameter \"%s\"",
135 foreach(lc2, publish_list)
137 char *publish_opt = (
char *)
lfirst(lc2);
139 if (strcmp(publish_opt,
"insert") == 0)
141 else if (strcmp(publish_opt,
"update") == 0)
143 else if (strcmp(publish_opt,
"delete") == 0)
145 else if (strcmp(publish_opt,
"truncate") == 0)
149 (
errcode(ERRCODE_SYNTAX_ERROR),
150 errmsg(
"unrecognized value for publication option \"%s\": \"%s\"",
151 "publish", publish_opt)));
154 else if (strcmp(defel->
defname,
"publish_via_partition_root") == 0)
156 if (*publish_via_partition_root_given)
158 *publish_via_partition_root_given =
true;
163 (
errcode(ERRCODE_SYNTAX_ERROR),
164 errmsg(
"unrecognized publication parameter: \"%s\"", defel->
defname)));
179 if (!pubobjspec_list)
182 foreach(cell, pubobjspec_list)
202 if (search_path ==
NIL)
204 errcode(ERRCODE_UNDEFINED_SCHEMA),
205 errmsg(
"no schema has been selected for CURRENT_SCHEMA"));
279 if (relation->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
290 if (pubviaroot && relation->
rd_rel->relispartition)
296 publish_as_relid = relid;
307 Anum_pg_publication_rel_prqual,
317 context.
parentid = publish_as_relid;
318 context.
relid = relid;
359 if (pubviaroot && relation->
rd_rel->relispartition)
364 publish_as_relid = relid;
375 Anum_pg_publication_rel_prattrs,
385 if (relation->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
492 char *errdetail_msg = NULL;
502 errdetail_msg =
_(
"System columns are not allowed.");
509 errdetail_msg =
_(
"User-defined operators are not allowed.");
511 case T_ScalarArrayOpExpr:
514 errdetail_msg =
_(
"User-defined operators are not allowed.");
522 case T_RowCompareExpr:
531 errdetail_msg =
_(
"User-defined operators are not allowed.");
555 errdetail_msg =
_(
"Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
564 if (!errdetail_msg && !
IsA(node,
List))
567 errdetail_msg =
_(
"User-defined types are not allowed.");
570 errdetail_msg =
_(
"User-defined or built-in mutable functions are not allowed.");
573 errdetail_msg =
_(
"User-defined collations are not allowed.");
582 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
583 errmsg(
"invalid publication WHERE expression"),
620 Node *whereclause = NULL;
635 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
636 errmsg(
"cannot use publication WHERE clause for relation \"%s\"",
638 errdetail(
"WHERE clause cannot be used for a partitioned table when %s is false.",
639 "publish_via_partition_root")));
655 "PUBLICATION WHERE");
685 bool publish_schema,
bool pubviaroot)
708 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
709 errmsg(
"cannot use column list for relation \"%s.%s\" in publication \"%s\"",
712 errdetail(
"Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
722 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
723 errmsg(
"cannot use column list for relation \"%s.%s\" in publication \"%s\"",
726 errdetail(
"Column lists cannot be specified for partitioned tables when %s is false.",
727 "publish_via_partition_root")));
740 bool nulls[Natts_pg_publication];
745 bool publish_via_partition_root_given;
746 bool publish_via_partition_root;
760 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
761 errmsg(
"must be superuser to create FOR ALL TABLES publication")));
771 errmsg(
"publication \"%s\" already exists",
776 memset(nulls,
false,
sizeof(nulls));
778 values[Anum_pg_publication_pubname - 1] =
784 &publish_given, &pubactions,
785 &publish_via_partition_root_given,
786 &publish_via_partition_root);
789 Anum_pg_publication_oid);
791 values[Anum_pg_publication_puballtables - 1] =
793 values[Anum_pg_publication_pubinsert - 1] =
795 values[Anum_pg_publication_pubupdate - 1] =
797 values[Anum_pg_publication_pubdelete - 1] =
799 values[Anum_pg_publication_pubtruncate - 1] =
801 values[Anum_pg_publication_pubviaroot - 1] =
818 if (
stmt->for_all_tables)
831 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
832 errmsg(
"must be superuser to create FOR TABLES IN SCHEMA publication"));
834 if (relations !=
NIL)
840 publish_via_partition_root);
844 publish_via_partition_root);
850 if (schemaidlist !=
NIL)
867 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
868 errmsg(
"wal_level is insufficient to publish logical changes"),
869 errhint(
"Set wal_level to \"logical\" before creating subscriptions.")));
881 bool nulls[Natts_pg_publication];
882 bool replaces[Natts_pg_publication];
886 bool publish_via_partition_root_given;
887 bool publish_via_partition_root;
895 &publish_given, &pubactions,
896 &publish_via_partition_root_given,
897 &publish_via_partition_root);
907 if (!pubform->puballtables && publish_via_partition_root_given &&
908 !publish_via_partition_root)
922 foreach(lc, root_relids)
941 has_rowfilter = !
heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
942 has_collist = !
heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
943 if (!has_rowfilter && !has_collist)
950 if (relkind != RELKIND_PARTITIONED_TABLE)
964 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
965 errmsg(
"cannot set parameter \"%s\" to false for publication \"%s\"",
966 "publish_via_partition_root",
968 errdetail(
"The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
969 relname,
"publish_via_partition_root")));
972 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
973 errmsg(
"cannot set parameter \"%s\" to false for publication \"%s\"",
974 "publish_via_partition_root",
976 errdetail(
"The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
977 relname,
"publish_via_partition_root")));
983 memset(nulls,
false,
sizeof(nulls));
984 memset(replaces,
false,
sizeof(replaces));
989 replaces[Anum_pg_publication_pubinsert - 1] =
true;
992 replaces[Anum_pg_publication_pubupdate - 1] =
true;
995 replaces[Anum_pg_publication_pubdelete - 1] =
true;
998 replaces[Anum_pg_publication_pubtruncate - 1] =
true;
1001 if (publish_via_partition_root_given)
1003 values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
1004 replaces[Anum_pg_publication_pubviaroot - 1] =
true;
1018 if (pubform->puballtables)
1032 if (root_relids ==
NIL)
1041 foreach(lc, root_relids)
1087 List *tables,
const char *queryString,
1088 bool publish_schema)
1092 Oid pubid = pubform->oid;
1111 pubform->pubviaroot);
1127 pubform->pubviaroot);
1133 foreach(oldlc, oldrelids)
1140 Node *oldrelwhereclause = NULL;
1155 Datum whereClauseDatum;
1156 Datum columnListDatum;
1160 Anum_pg_publication_rel_prqual,
1167 Anum_pg_publication_rel_prattrs,
1176 foreach(newlc, rels)
1193 foreach(lc, newpubrel->
columns)
1231 delrels =
lappend(delrels, oldrel);
1281 foreach(lc, reloids)
1296 if (!
heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1298 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1299 errmsg(
"cannot add schema to publication \"%s\"",
1301 errdetail(
"Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1348 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1349 errmsg(
"must be superuser to add or set schemas")));
1355 if (schemaidlist && pubform->puballtables)
1357 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1358 errmsg(
"publication \"%s\" is defined as FOR ALL TABLES",
1360 errdetail(
"Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1363 if (tables && pubform->puballtables)
1365 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1366 errmsg(
"publication \"%s\" is defined as FOR ALL TABLES",
1368 errdetail(
"Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1391 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1392 errmsg(
"publication \"%s\" does not exist",
1408 Oid pubid = pubform->oid;
1430 errcode(ERRCODE_UNDEFINED_OBJECT),
1431 errmsg(
"publication \"%s\" does not exist",
1435 schemaidlist !=
NIL);
1460 elog(
ERROR,
"cache lookup failed for publication table %u",
1499 elog(
ERROR,
"cache lookup failed for publication %u", pubid);
1504 if (pubform->puballtables)
1530 elog(
ERROR,
"cache lookup failed for publication schema %u", psoid);
1562 List *relids_with_collist =
NIL;
1594 errmsg(
"conflicting or redundant WHERE clauses for table \"%s\"",
1601 errmsg(
"conflicting or redundant column lists for table \"%s\"",
1612 rels =
lappend(rels, pub_rel);
1616 relids_with_rf =
lappend_oid(relids_with_rf, myrelid);
1619 relids_with_collist =
lappend_oid(relids_with_collist, myrelid);
1627 if (recurse && rel->
rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1635 foreach(child, children)
1653 if (childrelid != myrelid &&
1657 errmsg(
"conflicting or redundant WHERE clauses for table \"%s\"",
1665 if (childrelid != myrelid &&
1669 errmsg(
"conflicting or redundant column lists for table \"%s\"",
1684 rels =
lappend(rels, pub_rel);
1688 relids_with_rf =
lappend_oid(relids_with_rf, childrelid);
1691 relids_with_collist =
lappend_oid(relids_with_collist, childrelid);
1730 foreach(lc, schemalist)
1745 errcode(ERRCODE_UNDEFINED_SCHEMA),
1746 errmsg(
"schema with OID %u does not exist", schemaid));
1802 errcode(ERRCODE_SYNTAX_ERROR),
1803 errmsg(
"column list must not be specified in ALTER PUBLICATION ... DROP"));
1814 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1815 errmsg(
"relation \"%s\" is not part of the publication",
1821 (
errcode(ERRCODE_SYNTAX_ERROR),
1822 errmsg(
"cannot use a WHERE clause when removing a table from a publication")));
1840 foreach(lc, schemas)
1867 foreach(lc, schemas)
1872 Anum_pg_publication_namespace_oid,
1881 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1882 errmsg(
"tables from schema \"%s\" are not part of the publication",
1901 if (form->pubowner == newOwnerId)
1924 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1925 errmsg(
"permission denied to change owner of publication \"%s\"",
1927 errhint(
"The owner of a FOR ALL TABLES publication must be a superuser.")));
1931 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1932 errmsg(
"permission denied to change owner of publication \"%s\"",
1934 errhint(
"The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1937 form->pubowner = newOwnerId;
1967 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1968 errmsg(
"publication \"%s\" does not exist",
name)));
1971 subid = pubform->oid;
1999 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2000 errmsg(
"publication with OID %u does not exist", subid)));
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
#define InvalidAttrNumber
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
int bms_next_member(const Bitmapset *a, int prevbit)
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
char * get_database_name(Oid dbid)
elog(ERROR, "%s: %s", p2, msg)
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool equal(const void *a, const void *b)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
void CacheInvalidateRelcacheByRelid(Oid relid)
void CacheInvalidateRelcacheAll(void)
if(TABLE==NULL||TABLE_index==NULL)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * list_difference_oid(const List *list1, const List *list2)
List * list_concat_unique_oid(List *list1, const List *list2)
List * lappend(List *list, void *datum)
List * lappend_oid(List *list, Oid datum)
void list_free(List *list)
bool list_member_oid(const List *list, Oid datum)
List * list_append_unique_oid(List *list, Oid datum)
void list_free_deep(List *list)
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
#define ShareUpdateExclusiveLock
AttrNumber get_attnum(Oid relid, const char *attname)
char * get_namespace_name(Oid nspid)
char get_rel_relkind(Oid relid)
char * get_rel_name(Oid relid)
char func_volatile(Oid funcid)
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
#define CHECK_FOR_INTERRUPTS()
Datum namein(PG_FUNCTION_ARGS)
List * fetch_search_path(bool includeImplicit)
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Oid exprType(const Node *expr)
Oid exprInputCollation(const Node *expr)
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Oid exprCollation(const Node *expr)
int exprLocation(const Node *expr)
#define expression_tree_walker(n, w, c)
#define IsA(nodeptr, _type_)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
int parser_errposition(ParseState *pstate, int location)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst_node(type, lc)
static int list_length(const List *l)
bool is_schema_publication(Oid pubid)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
#define ERRCODE_DUPLICATE_OBJECT
const char * p_sourcetext
PublicationObjSpecType pubobjtype
PublicationTable * pubtable
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
#define SearchSysCacheCopy1(cacheId, key1)
@ PUBLICATIONNAMESPACEMAP
#define SearchSysCacheExists1(cacheId, key1)
#define GetSysCacheOid1(cacheId, oidcol, key1)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
#define FirstNormalObjectId
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
void CommandCounterIncrement(void)