80 bool *publish_via_partition_root_given,
81 bool *publish_via_partition_root)
85 *publish_given =
false;
86 *publish_via_partition_root_given =
false;
93 *publish_via_partition_root =
false;
100 if (strcmp(defel->
defname,
"publish") == 0)
118 *publish_given =
true;
123 (
errcode(ERRCODE_SYNTAX_ERROR),
124 errmsg(
"invalid list syntax in parameter \"%s\"",
128 foreach(lc2, publish_list)
130 char *publish_opt = (
char *)
lfirst(lc2);
132 if (strcmp(publish_opt,
"insert") == 0)
134 else if (strcmp(publish_opt,
"update") == 0)
136 else if (strcmp(publish_opt,
"delete") == 0)
138 else if (strcmp(publish_opt,
"truncate") == 0)
142 (
errcode(ERRCODE_SYNTAX_ERROR),
143 errmsg(
"unrecognized value for publication option \"%s\": \"%s\"",
144 "publish", publish_opt)));
147 else if (strcmp(defel->
defname,
"publish_via_partition_root") == 0)
149 if (*publish_via_partition_root_given)
151 *publish_via_partition_root_given =
true;
156 (
errcode(ERRCODE_SYNTAX_ERROR),
157 errmsg(
"unrecognized publication parameter: \"%s\"", defel->
defname)));
172 if (!pubobjspec_list)
175 foreach(cell, pubobjspec_list)
195 if (search_path ==
NIL)
197 errcode(ERRCODE_UNDEFINED_SCHEMA),
198 errmsg(
"no schema has been selected for CURRENT_SCHEMA"));
272 if (relation->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
283 if (pubviaroot && relation->
rd_rel->relispartition)
289 publish_as_relid = relid;
300 Anum_pg_publication_rel_prqual,
310 context.
parentid = publish_as_relid;
311 context.
relid = relid;
352 if (pubviaroot && relation->
rd_rel->relispartition)
357 publish_as_relid = relid;
368 Anum_pg_publication_rel_prattrs,
378 if (relation->
rd_rel->relreplident == REPLICA_IDENTITY_FULL)
485 char *errdetail_msg = NULL;
495 errdetail_msg =
_(
"System columns are not allowed.");
502 errdetail_msg =
_(
"User-defined operators are not allowed.");
504 case T_ScalarArrayOpExpr:
507 errdetail_msg =
_(
"User-defined operators are not allowed.");
515 case T_RowCompareExpr:
524 errdetail_msg =
_(
"User-defined operators are not allowed.");
548 errdetail_msg =
_(
"Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
557 if (!errdetail_msg && !
IsA(node,
List))
560 errdetail_msg =
_(
"User-defined types are not allowed.");
563 errdetail_msg =
_(
"User-defined or built-in mutable functions are not allowed.");
566 errdetail_msg =
_(
"User-defined collations are not allowed.");
575 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
576 errmsg(
"invalid publication WHERE expression"),
613 Node *whereclause = NULL;
628 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
629 errmsg(
"cannot use publication WHERE clause for relation \"%s\"",
631 errdetail(
"WHERE clause cannot be used for a partitioned table when %s is false.",
632 "publish_via_partition_root")));
648 "PUBLICATION WHERE");
678 bool publish_schema,
bool pubviaroot)
701 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
702 errmsg(
"cannot use column list for relation \"%s.%s\" in publication \"%s\"",
705 errdetail(
"Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
715 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
716 errmsg(
"cannot use column list for relation \"%s.%s\" in publication \"%s\"",
719 errdetail(
"Column lists cannot be specified for partitioned tables when %s is false.",
720 "publish_via_partition_root")));
733 bool nulls[Natts_pg_publication];
738 bool publish_via_partition_root_given;
739 bool publish_via_partition_root;
753 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
754 errmsg(
"must be superuser to create FOR ALL TABLES publication")));
764 errmsg(
"publication \"%s\" already exists",
769 memset(nulls,
false,
sizeof(nulls));
771 values[Anum_pg_publication_pubname - 1] =
777 &publish_given, &pubactions,
778 &publish_via_partition_root_given,
779 &publish_via_partition_root);
782 Anum_pg_publication_oid);
784 values[Anum_pg_publication_puballtables - 1] =
786 values[Anum_pg_publication_pubinsert - 1] =
788 values[Anum_pg_publication_pubupdate - 1] =
790 values[Anum_pg_publication_pubdelete - 1] =
792 values[Anum_pg_publication_pubtruncate - 1] =
794 values[Anum_pg_publication_pubviaroot - 1] =
811 if (
stmt->for_all_tables)
824 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
825 errmsg(
"must be superuser to create FOR TABLES IN SCHEMA publication"));
827 if (relations !=
NIL)
833 publish_via_partition_root);
837 publish_via_partition_root);
843 if (schemaidlist !=
NIL)
860 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
861 errmsg(
"wal_level is insufficient to publish logical changes"),
862 errhint(
"Set wal_level to \"logical\" before creating subscriptions.")));
874 bool nulls[Natts_pg_publication];
875 bool replaces[Natts_pg_publication];
879 bool publish_via_partition_root_given;
880 bool publish_via_partition_root;
888 &publish_given, &pubactions,
889 &publish_via_partition_root_given,
890 &publish_via_partition_root);
900 if (!pubform->puballtables && publish_via_partition_root_given &&
901 !publish_via_partition_root)
915 foreach(lc, root_relids)
934 has_rowfilter = !
heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
935 has_collist = !
heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
936 if (!has_rowfilter && !has_collist)
943 if (relkind != RELKIND_PARTITIONED_TABLE)
957 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
958 errmsg(
"cannot set parameter \"%s\" to false for publication \"%s\"",
959 "publish_via_partition_root",
961 errdetail(
"The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
962 relname,
"publish_via_partition_root")));
965 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
966 errmsg(
"cannot set parameter \"%s\" to false for publication \"%s\"",
967 "publish_via_partition_root",
969 errdetail(
"The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
970 relname,
"publish_via_partition_root")));
976 memset(nulls,
false,
sizeof(nulls));
977 memset(replaces,
false,
sizeof(replaces));
982 replaces[Anum_pg_publication_pubinsert - 1] =
true;
985 replaces[Anum_pg_publication_pubupdate - 1] =
true;
988 replaces[Anum_pg_publication_pubdelete - 1] =
true;
991 replaces[Anum_pg_publication_pubtruncate - 1] =
true;
994 if (publish_via_partition_root_given)
996 values[Anum_pg_publication_pubviaroot - 1] =
BoolGetDatum(publish_via_partition_root);
997 replaces[Anum_pg_publication_pubviaroot - 1] =
true;
1011 if (pubform->puballtables)
1025 if (root_relids ==
NIL)
1034 foreach(lc, root_relids)
1080 List *tables,
const char *queryString,
1081 bool publish_schema)
1085 Oid pubid = pubform->oid;
1104 pubform->pubviaroot);
1120 pubform->pubviaroot);
1126 foreach(oldlc, oldrelids)
1133 Node *oldrelwhereclause = NULL;
1148 Datum whereClauseDatum;
1149 Datum columnListDatum;
1153 Anum_pg_publication_rel_prqual,
1160 Anum_pg_publication_rel_prattrs,
1169 foreach(newlc, rels)
1186 foreach(lc, newpubrel->
columns)
1224 delrels =
lappend(delrels, oldrel);
1274 foreach(lc, reloids)
1289 if (!
heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1291 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1292 errmsg(
"cannot add schema to publication \"%s\"",
1294 errdetail(
"Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1341 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1342 errmsg(
"must be superuser to add or set schemas")));
1348 if (schemaidlist && pubform->puballtables)
1350 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1351 errmsg(
"publication \"%s\" is defined as FOR ALL TABLES",
1353 errdetail(
"Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1356 if (tables && pubform->puballtables)
1358 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1359 errmsg(
"publication \"%s\" is defined as FOR ALL TABLES",
1361 errdetail(
"Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1384 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1385 errmsg(
"publication \"%s\" does not exist",
1401 Oid pubid = pubform->oid;
1423 errcode(ERRCODE_UNDEFINED_OBJECT),
1424 errmsg(
"publication \"%s\" does not exist",
1428 schemaidlist !=
NIL);
1453 elog(
ERROR,
"cache lookup failed for publication table %u",
1492 elog(
ERROR,
"cache lookup failed for publication %u", pubid);
1497 if (pubform->puballtables)
1523 elog(
ERROR,
"cache lookup failed for publication schema %u", psoid);
1555 List *relids_with_collist =
NIL;
1587 errmsg(
"conflicting or redundant WHERE clauses for table \"%s\"",
1594 errmsg(
"conflicting or redundant column lists for table \"%s\"",
1605 rels =
lappend(rels, pub_rel);
1609 relids_with_rf =
lappend_oid(relids_with_rf, myrelid);
1612 relids_with_collist =
lappend_oid(relids_with_collist, myrelid);
1620 if (recurse && rel->
rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1628 foreach(child, children)
1646 if (childrelid != myrelid &&
1650 errmsg(
"conflicting or redundant WHERE clauses for table \"%s\"",
1658 if (childrelid != myrelid &&
1662 errmsg(
"conflicting or redundant column lists for table \"%s\"",
1677 rels =
lappend(rels, pub_rel);
1681 relids_with_rf =
lappend_oid(relids_with_rf, childrelid);
1684 relids_with_collist =
lappend_oid(relids_with_collist, childrelid);
1723 foreach(lc, schemalist)
1738 errcode(ERRCODE_UNDEFINED_SCHEMA),
1739 errmsg(
"schema with OID %u does not exist", schemaid));
1795 errcode(ERRCODE_SYNTAX_ERROR),
1796 errmsg(
"column list must not be specified in ALTER PUBLICATION ... DROP"));
1798 prid =
GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1807 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1808 errmsg(
"relation \"%s\" is not part of the publication",
1814 (
errcode(ERRCODE_SYNTAX_ERROR),
1815 errmsg(
"cannot use a WHERE clause when removing a table from a publication")));
1833 foreach(lc, schemas)
1860 foreach(lc, schemas)
1865 Anum_pg_publication_namespace_oid,
1874 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1875 errmsg(
"tables from schema \"%s\" are not part of the publication",
1894 if (form->pubowner == newOwnerId)
1917 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1918 errmsg(
"permission denied to change owner of publication \"%s\"",
1920 errhint(
"The owner of a FOR ALL TABLES publication must be a superuser.")));
1924 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1925 errmsg(
"permission denied to change owner of publication \"%s\"",
1927 errhint(
"The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1930 form->pubowner = newOwnerId;
1960 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1961 errmsg(
"publication \"%s\" does not exist",
name)));
1964 subid = pubform->oid;
1992 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1993 errmsg(
"publication with OID %u does not exist", subid)));
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
#define InvalidAttrNumber
bool bms_equal(const Bitmapset *a, const Bitmapset *b)
int bms_next_member(const Bitmapset *a, int prevbit)
void bms_free(Bitmapset *a)
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
char * get_database_name(Oid dbid)
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
bool equal(const void *a, const void *b)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define DirectFunctionCall1(func, arg1)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
bool heap_attisnull(HeapTuple tup, int attnum, TupleDesc tupleDesc)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
void CacheInvalidateRelcacheByRelid(Oid relid)
void CacheInvalidateRelcacheAll(void)
if(TABLE==NULL||TABLE_index==NULL)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * list_difference_oid(const List *list1, const List *list2)
List * list_concat_unique_oid(List *list1, const List *list2)
List * lappend(List *list, void *datum)
List * lappend_oid(List *list, Oid datum)
void list_free(List *list)
bool list_member_oid(const List *list, Oid datum)
List * list_append_unique_oid(List *list, Oid datum)
void list_free_deep(List *list)
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
#define ShareUpdateExclusiveLock
AttrNumber get_attnum(Oid relid, const char *attname)
char * get_namespace_name(Oid nspid)
char get_rel_relkind(Oid relid)
char * get_rel_name(Oid relid)
char func_volatile(Oid funcid)
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
#define CHECK_FOR_INTERRUPTS()
Datum namein(PG_FUNCTION_ARGS)
List * fetch_search_path(bool includeImplicit)
Oid get_namespace_oid(const char *nspname, bool missing_ok)
Oid exprType(const Node *expr)
Oid exprInputCollation(const Node *expr)
bool check_functions_in_node(Node *node, check_function_callback checker, void *context)
Oid exprCollation(const Node *expr)
int exprLocation(const Node *expr)
#define expression_tree_walker(n, w, c)
#define IsA(nodeptr, _type_)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
ObjectType get_relkind_objtype(char relkind)
const ObjectAddress InvalidObjectAddress
#define ObjectAddressSet(addr, class_id, object_id)
Node * transformWhereClause(ParseState *pstate, Node *clause, ParseExprKind exprKind, const char *constructName)
void assign_expr_collations(ParseState *pstate, Node *expr)
void free_parsestate(ParseState *pstate)
int parser_errposition(ParseState *pstate, int location)
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
void addNSItemToQuery(ParseState *pstate, ParseNamespaceItem *nsitem, bool addToJoinList, bool addToRelNameSpace, bool addToVarNameSpace)
@ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
@ PUBLICATIONOBJ_TABLES_IN_SCHEMA
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
#define lfirst_node(type, lc)
static int list_length(const List *l)
bool is_schema_publication(Oid pubid)
List * GetPublicationSchemas(Oid pubid)
ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
Bitmapset * pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
FormData_pg_publication * Form_pg_publication
FormData_pg_publication_namespace * Form_pg_publication_namespace
FormData_pg_publication_rel * Form_pg_publication_rel
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
struct rf_context rf_context
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
void InvalidatePublicationRels(List *relids)
static void CloseTableList(List *rels)
void RemovePublicationSchemaById(Oid psoid)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static bool contain_mutable_or_user_functions_checker(Oid func_id, void *context)
void RemovePublicationById(Oid pubid)
static List * OpenTableList(List *tables)
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
static bool check_simple_rowfilter_expr(Node *node, ParseState *pstate)
void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
void RemovePublicationRelById(Oid proid)
ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
static void LockSchemaList(List *schemalist)
static bool check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
#define MAX_RELCACHE_INVAL_MSGS
void * stringToNode(const char *str)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
#define ERRCODE_DUPLICATE_OBJECT
const char * p_sourcetext
PublicationObjSpecType pubobjtype
PublicationTable * pubtable
Bitmapset * bms_replident
bool superuser_arg(Oid roleid)
#define FirstLowInvalidHeapAttributeNumber
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheExists1(cacheId, key1)
#define GetSysCacheOid1(cacheId, oidcol, key1)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
#define FirstNormalObjectId
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
void CommandCounterIncrement(void)