PostgreSQL Source Code git master
publicationcmds.h File Reference
Include dependency graph for publicationcmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define MAX_RELCACHE_INVAL_MSGS   4096
 

Functions

ObjectAddress CreatePublication (ParseState *pstate, CreatePublicationStmt *stmt)
 
void AlterPublication (ParseState *pstate, AlterPublicationStmt *stmt)
 
void RemovePublicationById (Oid pubid)
 
void RemovePublicationRelById (Oid proid)
 
void RemovePublicationSchemaById (Oid psoid)
 
ObjectAddress AlterPublicationOwner (const char *name, Oid newOwnerId)
 
void AlterPublicationOwner_oid (Oid pubid, Oid newOwnerId)
 
void InvalidatePublicationRels (List *relids)
 
bool pub_rf_contains_invalid_column (Oid pubid, Relation relation, List *ancestors, bool pubviaroot)
 
bool pub_contains_invalid_column (Oid pubid, Relation relation, List *ancestors, bool pubviaroot, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col)
 
void InvalidatePubRelSyncCache (Oid pubid, bool puballtables)
 

Macro Definition Documentation

◆ MAX_RELCACHE_INVAL_MSGS

#define MAX_RELCACHE_INVAL_MSGS   4096

Definition at line 23 of file publicationcmds.h.

Function Documentation

◆ AlterPublication()

void AlterPublication ( ParseState pstate,
AlterPublicationStmt stmt 
)

Definition at line 1530 of file publicationcmds.c.

1531{
1532 Relation rel;
1533 HeapTuple tup;
1534 Form_pg_publication pubform;
1535
1536 rel = table_open(PublicationRelationId, RowExclusiveLock);
1537
1538 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1539 CStringGetDatum(stmt->pubname));
1540
1541 if (!HeapTupleIsValid(tup))
1542 ereport(ERROR,
1543 (errcode(ERRCODE_UNDEFINED_OBJECT),
1544 errmsg("publication \"%s\" does not exist",
1545 stmt->pubname)));
1546
1547 pubform = (Form_pg_publication) GETSTRUCT(tup);
1548
1549 /* must be owner */
1550 if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1552 stmt->pubname);
1553
1554 if (stmt->options)
1555 AlterPublicationOptions(pstate, stmt, rel, tup);
1556 else
1557 {
1558 List *relations = NIL;
1559 List *schemaidlist = NIL;
1560 Oid pubid = pubform->oid;
1561
1562 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1563 &schemaidlist);
1564
1565 CheckAlterPublication(stmt, tup, relations, schemaidlist);
1566
1567 heap_freetuple(tup);
1568
1569 /* Lock the publication so nobody else can do anything with it. */
1570 LockDatabaseObject(PublicationRelationId, pubid, 0,
1572
1573 /*
1574 * It is possible that by the time we acquire the lock on publication,
1575 * concurrent DDL has removed it. We can test this by checking the
1576 * existence of publication. We get the tuple again to avoid the risk
1577 * of any publication option getting changed.
1578 */
1579 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1580 if (!HeapTupleIsValid(tup))
1581 ereport(ERROR,
1582 errcode(ERRCODE_UNDEFINED_OBJECT),
1583 errmsg("publication \"%s\" does not exist",
1584 stmt->pubname));
1585
1586 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1587 schemaidlist != NIL);
1588 AlterPublicationSchemas(stmt, tup, schemaidlist);
1589 }
1590
1591 /* Cleanup. */
1592 heap_freetuple(tup);
1594}
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2652
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:4088
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:150
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
#define stmt
Definition: indent_codes.h:59
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1008
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
Oid GetUserId(void)
Definition: miscinit.c:469
@ OBJECT_PUBLICATION
Definition: parsenodes.h:2355
#define NIL
Definition: pg_list.h:68
FormData_pg_publication * Form_pg_publication
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:360
unsigned int Oid
Definition: postgres_ext.h:32
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, const char *queryString, bool publish_schema)
Definition: pg_list.h:54
const char * p_sourcetext
Definition: parse_node.h:195
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:91
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, AlterPublicationOptions(), AlterPublicationSchemas(), AlterPublicationTables(), CheckAlterPublication(), CStringGetDatum(), ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), GetUserId(), heap_freetuple(), HeapTupleIsValid, LockDatabaseObject(), NIL, object_ownercheck(), OBJECT_PUBLICATION, ObjectIdGetDatum(), ObjectsInPublicationToOids(), ParseState::p_sourcetext, RowExclusiveLock, SearchSysCacheCopy1, stmt, table_close(), and table_open().

Referenced by ProcessUtilitySlow().

◆ AlterPublicationOwner()

ObjectAddress AlterPublicationOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 2098 of file publicationcmds.c.

2099{
2100 Oid pubid;
2101 HeapTuple tup;
2102 Relation rel;
2103 ObjectAddress address;
2104 Form_pg_publication pubform;
2105
2106 rel = table_open(PublicationRelationId, RowExclusiveLock);
2107
2108 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2109
2110 if (!HeapTupleIsValid(tup))
2111 ereport(ERROR,
2112 (errcode(ERRCODE_UNDEFINED_OBJECT),
2113 errmsg("publication \"%s\" does not exist", name)));
2114
2115 pubform = (Form_pg_publication) GETSTRUCT(tup);
2116 pubid = pubform->oid;
2117
2118 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2119
2120 ObjectAddressSet(address, PublicationRelationId, pubid);
2121
2122 heap_freetuple(tup);
2123
2125
2126 return address;
2127}
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
const char * name

References AlterPublicationOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg(), ERROR, GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, name, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterPublicationOwner_oid()

void AlterPublicationOwner_oid ( Oid  pubid,
Oid  newOwnerId 
)

Definition at line 2133 of file publicationcmds.c.

2134{
2135 HeapTuple tup;
2136 Relation rel;
2137
2138 rel = table_open(PublicationRelationId, RowExclusiveLock);
2139
2140 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2141
2142 if (!HeapTupleIsValid(tup))
2143 ereport(ERROR,
2144 (errcode(ERRCODE_UNDEFINED_OBJECT),
2145 errmsg("publication with OID %u does not exist", pubid)));
2146
2147 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2148
2149 heap_freetuple(tup);
2150
2152}

References AlterPublicationOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned_Owner().

◆ CreatePublication()

ObjectAddress CreatePublication ( ParseState pstate,
CreatePublicationStmt stmt 
)

Definition at line 826 of file publicationcmds.c.

827{
828 Relation rel;
829 ObjectAddress myself;
830 Oid puboid;
831 bool nulls[Natts_pg_publication];
832 Datum values[Natts_pg_publication];
833 HeapTuple tup;
834 bool publish_given;
835 PublicationActions pubactions;
836 bool publish_via_partition_root_given;
837 bool publish_via_partition_root;
838 bool publish_generated_columns_given;
839 char publish_generated_columns;
840 AclResult aclresult;
841 List *relations = NIL;
842 List *schemaidlist = NIL;
843
844 /* must have CREATE privilege on database */
845 aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
846 if (aclresult != ACLCHECK_OK)
849
850 /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
851 if (!superuser())
852 {
853 if (stmt->for_all_tables || stmt->for_all_sequences)
855 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
856 errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
857 }
858
859 rel = table_open(PublicationRelationId, RowExclusiveLock);
860
861 /* Check if name is used */
862 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
863 CStringGetDatum(stmt->pubname));
864 if (OidIsValid(puboid))
867 errmsg("publication \"%s\" already exists",
868 stmt->pubname)));
869
870 /* Form a tuple. */
871 memset(values, 0, sizeof(values));
872 memset(nulls, false, sizeof(nulls));
873
874 values[Anum_pg_publication_pubname - 1] =
876 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
877
879 stmt->options,
880 &publish_given, &pubactions,
881 &publish_via_partition_root_given,
882 &publish_via_partition_root,
883 &publish_generated_columns_given,
884 &publish_generated_columns);
885
886 if (stmt->for_all_sequences &&
887 (publish_given || publish_via_partition_root_given ||
888 publish_generated_columns_given))
890 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
891 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
892
893 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
894 Anum_pg_publication_oid);
895 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
896 values[Anum_pg_publication_puballtables - 1] =
897 BoolGetDatum(stmt->for_all_tables);
898 values[Anum_pg_publication_puballsequences - 1] =
899 BoolGetDatum(stmt->for_all_sequences);
900 values[Anum_pg_publication_pubinsert - 1] =
901 BoolGetDatum(pubactions.pubinsert);
902 values[Anum_pg_publication_pubupdate - 1] =
903 BoolGetDatum(pubactions.pubupdate);
904 values[Anum_pg_publication_pubdelete - 1] =
905 BoolGetDatum(pubactions.pubdelete);
906 values[Anum_pg_publication_pubtruncate - 1] =
907 BoolGetDatum(pubactions.pubtruncate);
908 values[Anum_pg_publication_pubviaroot - 1] =
909 BoolGetDatum(publish_via_partition_root);
910 values[Anum_pg_publication_pubgencols - 1] =
911 CharGetDatum(publish_generated_columns);
912
913 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
914
915 /* Insert tuple into catalog. */
916 CatalogTupleInsert(rel, tup);
917 heap_freetuple(tup);
918
919 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
920
921 ObjectAddressSet(myself, PublicationRelationId, puboid);
922
923 /* Make the changes visible. */
925
926 /* Associate objects with the publication. */
927 if (stmt->for_all_tables)
928 {
929 /*
930 * Invalidate relcache so that publication info is rebuilt. Sequences
931 * publication doesn't require invalidation, as replica identity
932 * checks don't apply to them.
933 */
935 }
936 else if (!stmt->for_all_sequences)
937 {
938 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
939 &schemaidlist);
940
941 /* FOR TABLES IN SCHEMA requires superuser */
942 if (schemaidlist != NIL && !superuser())
944 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
945 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
946
947 if (relations != NIL)
948 {
949 List *rels;
950
951 rels = OpenTableList(relations);
953 publish_via_partition_root);
954
955 CheckPubRelationColumnList(stmt->pubname, rels,
956 schemaidlist != NIL,
957 publish_via_partition_root);
958
959 PublicationAddTables(puboid, rels, true, NULL);
960 CloseTableList(rels);
961 }
962
963 if (schemaidlist != NIL)
964 {
965 /*
966 * Schema lock is held until the publication is created to prevent
967 * concurrent schema deletion.
968 */
969 LockSchemaList(schemaidlist);
970 PublicationAddSchemas(puboid, schemaidlist, true, NULL);
971 }
972 }
973
975
976 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
977
980 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
981 errmsg("\"wal_level\" is insufficient to publish logical changes"),
982 errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
983
984 return myself;
985}
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition: aclchk.c:3834
static Datum values[MAXATTR]
Definition: bootstrap.c:153
#define OidIsValid(objectId)
Definition: c.h:777
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:448
int errhint(const char *fmt,...)
Definition: elog.c:1330
#define WARNING
Definition: elog.h:36
#define NOTICE
Definition: elog.h:35
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:682
Oid MyDatabaseId
Definition: globals.c:94
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1654
char * get_database_name(Oid dbid)
Definition: lsyscache.c:1259
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
@ OBJECT_DATABASE
Definition: parsenodes.h:2334
#define ACL_CREATE
Definition: parsenodes.h:85
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
static Datum BoolGetDatum(bool X)
Definition: postgres.h:112
uint64_t Datum
Definition: postgres.h:70
static Datum CharGetDatum(char X)
Definition: postgres.h:132
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
static void CloseTableList(List *rels)
static void TransformPubWhereClauses(List *tables, const char *queryString, bool pubviaroot)
static List * OpenTableList(List *tables)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root, bool *publish_generated_columns_given, char *publish_generated_columns)
static void CheckPubRelationColumnList(char *pubname, List *tables, bool publish_schema, bool pubviaroot)
static void LockSchemaList(List *schemalist)
#define RelationGetDescr(relation)
Definition: rel.h:541
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
bool superuser(void)
Definition: superuser.c:46
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:109
void CommandCounterIncrement(void)
Definition: xact.c:1101
int wal_level
Definition: xlog.c:133
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:76

References ACL_CREATE, aclcheck_error(), ACLCHECK_OK, BoolGetDatum(), CacheInvalidateRelcacheAll(), CatalogTupleInsert(), CharGetDatum(), CheckPubRelationColumnList(), CloseTableList(), CommandCounterIncrement(), CStringGetDatum(), DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, get_database_name(), GetNewOidWithIndex(), GetSysCacheOid1, GetUserId(), heap_form_tuple(), heap_freetuple(), InvokeObjectPostCreateHook, LockSchemaList(), MyDatabaseId, namein(), NIL, NOTICE, object_aclcheck(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum(), ObjectsInPublicationToOids(), OidIsValid, OpenTableList(), ParseState::p_sourcetext, parse_publication_options(), PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationAddSchemas(), PublicationAddTables(), PublicationActions::pubtruncate, PublicationActions::pubupdate, recordDependencyOnOwner(), RelationGetDescr, RowExclusiveLock, stmt, superuser(), table_close(), table_open(), TransformPubWhereClauses(), values, wal_level, WAL_LEVEL_LOGICAL, and WARNING.

Referenced by ProcessUtilitySlow().

◆ InvalidatePublicationRels()

void InvalidatePublicationRels ( List relids)

Definition at line 1195 of file publicationcmds.c.

1196{
1197 /*
1198 * We don't want to send too many individual messages, at some point it's
1199 * cheaper to just reset whole relcache.
1200 */
1202 {
1203 ListCell *lc;
1204
1205 foreach(lc, relids)
1207 }
1208 else
1210}
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1687
static int list_length(const List *l)
Definition: pg_list.h:152
#define lfirst_oid(lc)
Definition: pg_list.h:174
#define MAX_RELCACHE_INVAL_MSGS

References CacheInvalidateRelcacheAll(), CacheInvalidateRelcacheByRelid(), lfirst_oid, list_length(), and MAX_RELCACHE_INVAL_MSGS.

Referenced by AlterPublicationOptions(), publication_add_relation(), publication_add_schema(), RemovePublicationRelById(), and RemovePublicationSchemaById().

◆ InvalidatePubRelSyncCache()

void InvalidatePubRelSyncCache ( Oid  pubid,
bool  puballtables 
)

Definition at line 500 of file publicationcmds.c.

501{
502 if (puballtables)
503 {
505 }
506 else
507 {
508 List *relids = NIL;
509 List *schemarelids = NIL;
510
511 /*
512 * For partitioned tables, we must invalidate all partitions and
513 * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
514 * a target. However, WAL records for TRUNCATE specify both a root and
515 * its leaves.
516 */
517 relids = GetPublicationRelations(pubid,
519 schemarelids = GetAllSchemaPublicationRelations(pubid,
521
522 relids = list_concat_unique_oid(relids, schemarelids);
523
524 /* Invalidate the relsyncache */
525 foreach_oid(relid, relids)
527 }
528
529 return;
530}
void CacheInvalidateRelSyncAll(void)
Definition: inval.c:1720
void CacheInvalidateRelSync(Oid relid)
Definition: inval.c:1708
List * list_concat_unique_oid(List *list1, const List *list2)
Definition: list.c:1469
#define foreach_oid(var, lst)
Definition: pg_list.h:471
List * GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
@ PUBLICATION_PART_ALL

References CacheInvalidateRelSync(), CacheInvalidateRelSyncAll(), foreach_oid, GetAllSchemaPublicationRelations(), GetPublicationRelations(), list_concat_unique_oid(), NIL, and PUBLICATION_PART_ALL.

Referenced by AlterObjectRename_internal().

◆ pub_contains_invalid_column()

bool pub_contains_invalid_column ( Oid  pubid,
Relation  relation,
List ancestors,
bool  pubviaroot,
char  pubgencols_type,
bool *  invalid_column_list,
bool *  invalid_gen_col 
)

Definition at line 356 of file publicationcmds.c.

360{
361 Oid relid = RelationGetRelid(relation);
362 Oid publish_as_relid = RelationGetRelid(relation);
363 Bitmapset *idattrs;
364 Bitmapset *columns = NULL;
365 TupleDesc desc = RelationGetDescr(relation);
366 Publication *pub;
367 int x;
368
369 *invalid_column_list = false;
370 *invalid_gen_col = false;
371
372 /*
373 * For a partition, if pubviaroot is true, find the topmost ancestor that
374 * is published via this publication as we need to use its column list for
375 * the changes.
376 *
377 * Note that even though the column list used is for an ancestor, the
378 * REPLICA IDENTITY used will be for the actual child table.
379 */
380 if (pubviaroot && relation->rd_rel->relispartition)
381 {
382 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
383
384 if (!OidIsValid(publish_as_relid))
385 publish_as_relid = relid;
386 }
387
388 /* Fetch the column list */
389 pub = GetPublication(pubid);
390 check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
391
392 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
393 {
394 /* With REPLICA IDENTITY FULL, no column list is allowed. */
395 *invalid_column_list = (columns != NULL);
396
397 /*
398 * As we don't allow a column list with REPLICA IDENTITY FULL, the
399 * publish_generated_columns option must be set to stored if the table
400 * has any stored generated columns.
401 */
402 if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
403 relation->rd_att->constr &&
405 *invalid_gen_col = true;
406
407 /*
408 * Virtual generated columns are currently not supported for logical
409 * replication at all.
410 */
411 if (relation->rd_att->constr &&
413 *invalid_gen_col = true;
414
415 if (*invalid_gen_col && *invalid_column_list)
416 return true;
417 }
418
419 /* Remember columns that are part of the REPLICA IDENTITY */
420 idattrs = RelationGetIndexAttrBitmap(relation,
422
423 /*
424 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
425 * (to handle system columns the usual way), while column list does not
426 * use offset, so we can't do bms_is_subset(). Instead, we have to loop
427 * over the idattrs and check all of them are in the list.
428 */
429 x = -1;
430 while ((x = bms_next_member(idattrs, x)) >= 0)
431 {
433 Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
434
435 if (columns == NULL)
436 {
437 /*
438 * The publish_generated_columns option must be set to stored if
439 * the REPLICA IDENTITY contains any stored generated column.
440 */
441 if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
442 {
443 *invalid_gen_col = true;
444 break;
445 }
446
447 /*
448 * The equivalent setting for virtual generated columns does not
449 * exist yet.
450 */
451 if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
452 {
453 *invalid_gen_col = true;
454 break;
455 }
456
457 /* Skip validating the column list since it is not defined */
458 continue;
459 }
460
461 /*
462 * If pubviaroot is true, we are validating the column list of the
463 * parent table, but the bitmap contains the replica identity
464 * information of the child table. The parent/child attnums may not
465 * match, so translate them to the parent - get the attname from the
466 * child, and look it up in the parent.
467 */
468 if (pubviaroot)
469 {
470 /* attribute name in the child table */
471 char *colname = get_attname(relid, attnum, false);
472
473 /*
474 * Determine the attnum for the attribute name in parent (we are
475 * using the column list defined on the parent).
476 */
477 attnum = get_attnum(publish_as_relid, colname);
478 }
479
480 /* replica identity column, not covered by the column list */
481 *invalid_column_list |= !bms_is_member(attnum, columns);
482
483 if (*invalid_column_list && *invalid_gen_col)
484 break;
485 }
486
487 bms_free(columns);
488 bms_free(idattrs);
489
490 return *invalid_column_list || *invalid_gen_col;
491}
int16 AttrNumber
Definition: attnum.h:21
int bms_next_member(const Bitmapset *a, int prevbit)
Definition: bitmapset.c:1305
void bms_free(Bitmapset *a)
Definition: bitmapset.c:239
bool bms_is_member(int x, const Bitmapset *a)
Definition: bitmapset.c:510
int x
Definition: isn.c:75
AttrNumber get_attnum(Oid relid, const char *attname)
Definition: lsyscache.c:951
char * get_attname(Oid relid, AttrNumber attnum, bool missing_ok)
Definition: lsyscache.c:920
int16 attnum
Definition: pg_attribute.h:74
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:202
Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
Publication * GetPublication(Oid pubid)
bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols)
#define RelationGetRelid(relation)
Definition: rel.h:515
Bitmapset * RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
Definition: relcache.c:5303
@ INDEX_ATTR_BITMAP_IDENTITY_KEY
Definition: relcache.h:71
TupleDesc rd_att
Definition: rel.h:112
Form_pg_class rd_rel
Definition: rel.h:111
bool has_generated_virtual
Definition: tupdesc.h:47
bool has_generated_stored
Definition: tupdesc.h:46
TupleConstr * constr
Definition: tupdesc.h:141
#define FirstLowInvalidHeapAttributeNumber
Definition: sysattr.h:27
static FormData_pg_attribute * TupleDescAttr(TupleDesc tupdesc, int i)
Definition: tupdesc.h:160

References attnum, bms_free(), bms_is_member(), bms_next_member(), check_and_fetch_column_list(), TupleDescData::constr, FirstLowInvalidHeapAttributeNumber, get_attname(), get_attnum(), GetPublication(), GetTopMostAncestorInPublication(), TupleConstr::has_generated_stored, TupleConstr::has_generated_virtual, INDEX_ATTR_BITMAP_IDENTITY_KEY, OidIsValid, RelationData::rd_att, RelationData::rd_rel, RelationGetDescr, RelationGetIndexAttrBitmap(), RelationGetRelid, TupleDescAttr(), and x.

Referenced by RelationBuildPublicationDesc().

◆ pub_rf_contains_invalid_column()

bool pub_rf_contains_invalid_column ( Oid  pubid,
Relation  relation,
List ancestors,
bool  pubviaroot 
)

Definition at line 270 of file publicationcmds.c.

272{
273 HeapTuple rftuple;
274 Oid relid = RelationGetRelid(relation);
275 Oid publish_as_relid = RelationGetRelid(relation);
276 bool result = false;
277 Datum rfdatum;
278 bool rfisnull;
279
280 /*
281 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
282 * allowed in the row filter and we can skip the validation.
283 */
284 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
285 return false;
286
287 /*
288 * For a partition, if pubviaroot is true, find the topmost ancestor that
289 * is published via this publication as we need to use its row filter
290 * expression to filter the partition's changes.
291 *
292 * Note that even though the row filter used is for an ancestor, the
293 * REPLICA IDENTITY used will be for the actual child table.
294 */
295 if (pubviaroot && relation->rd_rel->relispartition)
296 {
297 publish_as_relid
298 = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
299
300 if (!OidIsValid(publish_as_relid))
301 publish_as_relid = relid;
302 }
303
304 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
305 ObjectIdGetDatum(publish_as_relid),
306 ObjectIdGetDatum(pubid));
307
308 if (!HeapTupleIsValid(rftuple))
309 return false;
310
311 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
312 Anum_pg_publication_rel_prqual,
313 &rfisnull);
314
315 if (!rfisnull)
316 {
317 rf_context context = {0};
318 Node *rfnode;
319 Bitmapset *bms = NULL;
320
321 context.pubviaroot = pubviaroot;
322 context.parentid = publish_as_relid;
323 context.relid = relid;
324
325 /* Remember columns that are part of the REPLICA IDENTITY */
326 bms = RelationGetIndexAttrBitmap(relation,
328
329 context.bms_replident = bms;
330 rfnode = stringToNode(TextDatumGetCString(rfdatum));
331 result = contain_invalid_rfcolumn_walker(rfnode, &context);
332 }
333
334 ReleaseSysCache(rftuple);
335
336 return result;
337}
#define TextDatumGetCString(d)
Definition: builtins.h:98
static bool contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
void * stringToNode(const char *str)
Definition: read.c:90
Definition: nodes.h:135
Bitmapset * bms_replident
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:595
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:230

References rf_context::bms_replident, contain_invalid_rfcolumn_walker(), GetTopMostAncestorInPublication(), HeapTupleIsValid, INDEX_ATTR_BITMAP_IDENTITY_KEY, ObjectIdGetDatum(), OidIsValid, rf_context::parentid, rf_context::pubviaroot, RelationData::rd_rel, RelationGetIndexAttrBitmap(), RelationGetRelid, ReleaseSysCache(), rf_context::relid, SearchSysCache2(), stringToNode(), SysCacheGetAttr(), and TextDatumGetCString.

Referenced by RelationBuildPublicationDesc().

◆ RemovePublicationById()

void RemovePublicationById ( Oid  pubid)

Definition at line 1641 of file publicationcmds.c.

1642{
1643 Relation rel;
1644 HeapTuple tup;
1645 Form_pg_publication pubform;
1646
1647 rel = table_open(PublicationRelationId, RowExclusiveLock);
1648
1649 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1650 if (!HeapTupleIsValid(tup))
1651 elog(ERROR, "cache lookup failed for publication %u", pubid);
1652
1653 pubform = (Form_pg_publication) GETSTRUCT(tup);
1654
1655 /* Invalidate relcache so that publication info is rebuilt. */
1656 if (pubform->puballtables)
1658
1659 CatalogTupleDelete(rel, &tup->t_self);
1660
1661 ReleaseSysCache(tup);
1662
1664}
#define elog(elevel,...)
Definition: elog.h:226
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition: indexing.c:365
ItemPointerData t_self
Definition: htup.h:65
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220

References CacheInvalidateRelcacheAll(), CatalogTupleDelete(), elog, ERROR, GETSTRUCT(), HeapTupleIsValid, ObjectIdGetDatum(), ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

◆ RemovePublicationRelById()

void RemovePublicationRelById ( Oid  proid)

Definition at line 1600 of file publicationcmds.c.

1601{
1602 Relation rel;
1603 HeapTuple tup;
1605 List *relids = NIL;
1606
1607 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1608
1609 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1610
1611 if (!HeapTupleIsValid(tup))
1612 elog(ERROR, "cache lookup failed for publication table %u",
1613 proid);
1614
1615 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1616
1617 /*
1618 * Invalidate relcache so that publication info is rebuilt.
1619 *
1620 * For the partitioned tables, we must invalidate all partitions contained
1621 * in the respective partition hierarchies, not just the one explicitly
1622 * mentioned in the publication. This is required because we implicitly
1623 * publish the child tables when the parent table is published.
1624 */
1626 pubrel->prrelid);
1627
1629
1630 CatalogTupleDelete(rel, &tup->t_self);
1631
1632 ReleaseSysCache(tup);
1633
1635}
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
FormData_pg_publication_rel * Form_pg_publication_rel
void InvalidatePublicationRels(List *relids)

References CatalogTupleDelete(), elog, ERROR, GetPubPartitionOptionRelations(), GETSTRUCT(), HeapTupleIsValid, InvalidatePublicationRels(), NIL, ObjectIdGetDatum(), PUBLICATION_PART_ALL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

◆ RemovePublicationSchemaById()

void RemovePublicationSchemaById ( Oid  psoid)

Definition at line 1670 of file publicationcmds.c.

1671{
1672 Relation rel;
1673 HeapTuple tup;
1674 List *schemaRels = NIL;
1676
1677 rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1678
1679 tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1680
1681 if (!HeapTupleIsValid(tup))
1682 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1683
1685
1686 /*
1687 * Invalidate relcache so that publication info is rebuilt. See
1688 * RemovePublicationRelById for why we need to consider all the
1689 * partitions.
1690 */
1691 schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1693 InvalidatePublicationRels(schemaRels);
1694
1695 CatalogTupleDelete(rel, &tup->t_self);
1696
1697 ReleaseSysCache(tup);
1698
1700}
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
FormData_pg_publication_namespace * Form_pg_publication_namespace

References CatalogTupleDelete(), elog, ERROR, GetSchemaPublicationRelations(), GETSTRUCT(), HeapTupleIsValid, InvalidatePublicationRels(), NIL, ObjectIdGetDatum(), PUBLICATION_PART_ALL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().