PostgreSQL Source Code  git master
publicationcmds.h File Reference
Include dependency graph for publicationcmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Macros

#define MAX_RELCACHE_INVAL_MSGS   4096
 

Functions

ObjectAddress CreatePublication (ParseState *pstate, CreatePublicationStmt *stmt)
 
void AlterPublication (ParseState *pstate, AlterPublicationStmt *stmt)
 
void RemovePublicationById (Oid pubid)
 
void RemovePublicationRelById (Oid proid)
 
void RemovePublicationSchemaById (Oid psoid)
 
ObjectAddress AlterPublicationOwner (const char *name, Oid newOwnerId)
 
void AlterPublicationOwner_oid (Oid pubid, Oid newOwnerId)
 
void InvalidatePublicationRels (List *relids)
 

Macro Definition Documentation

◆ MAX_RELCACHE_INVAL_MSGS

#define MAX_RELCACHE_INVAL_MSGS   4096

Definition at line 23 of file publicationcmds.h.

Function Documentation

◆ AlterPublication()

void AlterPublication ( ParseState pstate,
AlterPublicationStmt stmt 
)

Definition at line 693 of file publicationcmds.c.

694 {
695  Relation rel;
696  HeapTuple tup;
697  Form_pg_publication pubform;
698 
699  rel = table_open(PublicationRelationId, RowExclusiveLock);
700 
702  CStringGetDatum(stmt->pubname));
703 
704  if (!HeapTupleIsValid(tup))
705  ereport(ERROR,
706  (errcode(ERRCODE_UNDEFINED_OBJECT),
707  errmsg("publication \"%s\" does not exist",
708  stmt->pubname)));
709 
710  pubform = (Form_pg_publication) GETSTRUCT(tup);
711 
712  /* must be owner */
713  if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
715  stmt->pubname);
716 
717  if (stmt->options)
718  AlterPublicationOptions(pstate, stmt, rel, tup);
719  else
720  {
721  List *relations = NIL;
722  List *schemaidlist = NIL;
723 
724  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
725  &schemaidlist);
726 
727  CheckAlterPublication(stmt, tup, relations, schemaidlist);
728 
729  /*
730  * Lock the publication so nobody else can do anything with it. This
731  * prevents concurrent alter to add table(s) that were already going
732  * to become part of the publication by adding corresponding schema(s)
733  * via this command and similarly it will prevent the concurrent
734  * addition of schema(s) for which there is any corresponding table
735  * being added by this command.
736  */
737  LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
739 
740  /*
741  * It is possible that by the time we acquire the lock on publication,
742  * concurrent DDL has removed it. We can test this by checking the
743  * existence of publication.
744  */
746  ObjectIdGetDatum(pubform->oid)))
747  ereport(ERROR,
748  errcode(ERRCODE_UNDEFINED_OBJECT),
749  errmsg("publication \"%s\" does not exist",
750  stmt->pubname));
751 
752  AlterPublicationTables(stmt, tup, relations, schemaidlist);
753  AlterPublicationSchemas(stmt, tup, schemaidlist);
754  }
755 
756  /* Cleanup. */
757  heap_freetuple(tup);
759 }
@ ACLCHECK_NOT_OWNER
Definition: acl.h:181
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3308
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5358
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define ereport(elevel,...)
Definition: elog.h:143
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:649
void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:977
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
Oid GetUserId(void)
Definition: miscinit.c:495
@ OBJECT_PUBLICATION
Definition: parsenodes.h:1818
#define NIL
Definition: pg_list.h:65
FormData_pg_publication * Form_pg_publication
#define CStringGetDatum(X)
Definition: postgres.h:622
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
static void AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static void AlterPublicationSchemas(AlterPublicationStmt *stmt, HeapTuple tup, List *schemaidlist)
static void ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, List **rels, List **schemas)
static void CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup, List *tables, List *schemaidlist)
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
Definition: pg_list.h:51
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:177
@ PUBLICATIONOID
Definition: syscache.h:81
@ PUBLICATIONNAME
Definition: syscache.h:78
#define SearchSysCacheExists1(cacheId, key1)
Definition: syscache.h:186
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, AlterPublicationOptions(), AlterPublicationSchemas(), AlterPublicationTables(), CheckAlterPublication(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, GetUserId(), heap_freetuple(), HeapTupleIsValid, LockDatabaseObject(), NIL, OBJECT_PUBLICATION, ObjectIdGetDatum, ObjectsInPublicationToOids(), AlterPublicationStmt::options, pg_publication_ownercheck(), PUBLICATIONNAME, PUBLICATIONOID, AlterPublicationStmt::pubname, AlterPublicationStmt::pubobjects, RowExclusiveLock, SearchSysCacheCopy1, SearchSysCacheExists1, table_close(), and table_open().

Referenced by ProcessUtilitySlow().

◆ AlterPublicationOwner()

ObjectAddress AlterPublicationOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1222 of file publicationcmds.c.

1223 {
1224  Oid subid;
1225  HeapTuple tup;
1226  Relation rel;
1227  ObjectAddress address;
1228  Form_pg_publication pubform;
1229 
1230  rel = table_open(PublicationRelationId, RowExclusiveLock);
1231 
1233 
1234  if (!HeapTupleIsValid(tup))
1235  ereport(ERROR,
1236  (errcode(ERRCODE_UNDEFINED_OBJECT),
1237  errmsg("publication \"%s\" does not exist", name)));
1238 
1239  pubform = (Form_pg_publication) GETSTRUCT(tup);
1240  subid = pubform->oid;
1241 
1242  AlterPublicationOwner_internal(rel, tup, newOwnerId);
1243 
1244  ObjectAddressSet(address, PublicationRelationId, subid);
1245 
1246  heap_freetuple(tup);
1247 
1249 
1250  return address;
1251 }
const char * name
Definition: encode.c:561
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
unsigned int Oid
Definition: postgres_ext.h:31
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)

References AlterPublicationOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, name, ObjectAddressSet, PUBLICATIONNAME, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterPublicationOwner_oid()

void AlterPublicationOwner_oid ( Oid  pubid,
Oid  newOwnerId 
)

Definition at line 1257 of file publicationcmds.c.

1258 {
1259  HeapTuple tup;
1260  Relation rel;
1261 
1262  rel = table_open(PublicationRelationId, RowExclusiveLock);
1263 
1265 
1266  if (!HeapTupleIsValid(tup))
1267  ereport(ERROR,
1268  (errcode(ERRCODE_UNDEFINED_OBJECT),
1269  errmsg("publication with OID %u does not exist", subid)));
1270 
1271  AlterPublicationOwner_internal(rel, tup, newOwnerId);
1272 
1273  heap_freetuple(tup);
1274 
1276 }

References AlterPublicationOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned().

◆ CreatePublication()

ObjectAddress CreatePublication ( ParseState pstate,
CreatePublicationStmt stmt 
)

Definition at line 241 of file publicationcmds.c.

242 {
243  Relation rel;
244  ObjectAddress myself;
245  Oid puboid;
246  bool nulls[Natts_pg_publication];
247  Datum values[Natts_pg_publication];
248  HeapTuple tup;
249  bool publish_given;
250  PublicationActions pubactions;
251  bool publish_via_partition_root_given;
252  bool publish_via_partition_root;
253  AclResult aclresult;
254  List *relations = NIL;
255  List *schemaidlist = NIL;
256 
257  /* must have CREATE privilege on database */
259  if (aclresult != ACLCHECK_OK)
260  aclcheck_error(aclresult, OBJECT_DATABASE,
262 
263  /* FOR ALL TABLES requires superuser */
264  if (stmt->for_all_tables && !superuser())
265  ereport(ERROR,
266  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
267  errmsg("must be superuser to create FOR ALL TABLES publication")));
268 
269  rel = table_open(PublicationRelationId, RowExclusiveLock);
270 
271  /* Check if name is used */
272  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
273  CStringGetDatum(stmt->pubname));
274  if (OidIsValid(puboid))
275  {
276  ereport(ERROR,
278  errmsg("publication \"%s\" already exists",
279  stmt->pubname)));
280  }
281 
282  /* Form a tuple. */
283  memset(values, 0, sizeof(values));
284  memset(nulls, false, sizeof(nulls));
285 
286  values[Anum_pg_publication_pubname - 1] =
288  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
289 
291  stmt->options,
292  &publish_given, &pubactions,
293  &publish_via_partition_root_given,
294  &publish_via_partition_root);
295 
296  puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
297  Anum_pg_publication_oid);
298  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
299  values[Anum_pg_publication_puballtables - 1] =
301  values[Anum_pg_publication_pubinsert - 1] =
302  BoolGetDatum(pubactions.pubinsert);
303  values[Anum_pg_publication_pubupdate - 1] =
304  BoolGetDatum(pubactions.pubupdate);
305  values[Anum_pg_publication_pubdelete - 1] =
306  BoolGetDatum(pubactions.pubdelete);
307  values[Anum_pg_publication_pubtruncate - 1] =
308  BoolGetDatum(pubactions.pubtruncate);
309  values[Anum_pg_publication_pubviaroot - 1] =
310  BoolGetDatum(publish_via_partition_root);
311 
312  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
313 
314  /* Insert tuple into catalog. */
315  CatalogTupleInsert(rel, tup);
316  heap_freetuple(tup);
317 
318  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
319 
320  ObjectAddressSet(myself, PublicationRelationId, puboid);
321 
322  /* Make the changes visible. */
324 
325  /* Associate objects with the publication. */
326  if (stmt->for_all_tables)
327  {
328  /* Invalidate relcache so that publication info is rebuilt. */
330  }
331  else
332  {
333  ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
334  &schemaidlist);
335 
336  /* FOR ALL TABLES IN SCHEMA requires superuser */
337  if (list_length(schemaidlist) > 0 && !superuser())
338  ereport(ERROR,
339  errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
340  errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication"));
341 
342  if (list_length(relations) > 0)
343  {
344  List *rels;
345 
346  rels = OpenTableList(relations);
347  CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
349  PublicationAddTables(puboid, rels, true, NULL);
350  CloseTableList(rels);
351  }
352 
353  if (list_length(schemaidlist) > 0)
354  {
355  /*
356  * Schema lock is held until the publication is created to prevent
357  * concurrent schema deletion.
358  */
359  LockSchemaList(schemaidlist);
360  PublicationAddSchemas(puboid, schemaidlist, true, NULL);
361  }
362  }
363 
365 
366  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
367 
369  {
371  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
372  errmsg("wal_level is insufficient to publish logical changes"),
373  errhint("Set wal_level to logical before creating subscriptions.")));
374  }
375 
376  return myself;
377 }
AclResult
Definition: acl.h:178
@ ACLCHECK_OK
Definition: acl.h:179
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4708
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define OidIsValid(objectId)
Definition: c.h:710
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:381
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2106
int errhint(const char *fmt,...)
Definition: elog.c:1151
#define WARNING
Definition: elog.h:30
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:631
Oid MyDatabaseId
Definition: globals.c:88
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1386
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
@ PUBLICATIONOBJ_TABLE
Definition: parsenodes.h:3652
@ OBJECT_DATABASE
Definition: parsenodes.h:1798
#define ACL_CREATE
Definition: parsenodes.h:91
static int list_length(const List *l)
Definition: pg_list.h:149
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
uintptr_t Datum
Definition: postgres.h:411
#define BoolGetDatum(X)
Definition: postgres.h:446
static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt)
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
static void CloseTableList(List *rels)
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
static void CheckObjSchemaNotAlreadyInPublication(List *rels, List *schemaidlist, PublicationObjSpecType checkobjtype)
static List * OpenTableList(List *tables)
static void LockSchemaList(List *schemalist)
#define RelationGetDescr(relation)
Definition: rel.h:504
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
bool superuser(void)
Definition: superuser.c:46
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:195
void CommandCounterIncrement(void)
Definition: xact.c:1073
int wal_level
Definition: xlog.c:112
@ WAL_LEVEL_LOGICAL
Definition: xlog.h:123

References ACL_CREATE, aclcheck_error(), ACLCHECK_OK, BoolGetDatum, CacheInvalidateRelcacheAll(), CatalogTupleInsert(), CheckObjSchemaNotAlreadyInPublication(), CloseTableList(), CommandCounterIncrement(), CStringGetDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, CreatePublicationStmt::for_all_tables, get_database_name(), GetNewOidWithIndex(), GetSysCacheOid1, GetUserId(), heap_form_tuple(), heap_freetuple(), InvokeObjectPostCreateHook, list_length(), LockSchemaList(), MyDatabaseId, namein(), NIL, OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum, ObjectsInPublicationToOids(), OidIsValid, OpenTableList(), CreatePublicationStmt::options, parse_publication_options(), pg_database_aclcheck(), PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationAddSchemas(), PublicationAddTables(), PUBLICATIONNAME, PUBLICATIONOBJ_TABLE, CreatePublicationStmt::pubname, CreatePublicationStmt::pubobjects, PublicationActions::pubtruncate, PublicationActions::pubupdate, recordDependencyOnOwner(), RelationGetDescr, RowExclusiveLock, superuser(), table_close(), table_open(), values, wal_level, WAL_LEVEL_LOGICAL, and WARNING.

Referenced by ProcessUtilitySlow().

◆ InvalidatePublicationRels()

void InvalidatePublicationRels ( List relids)

Definition at line 473 of file publicationcmds.c.

474 {
475  /*
476  * We don't want to send too many individual messages, at some point it's
477  * cheaper to just reset whole relcache.
478  */
479  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
480  {
481  ListCell *lc;
482 
483  foreach(lc, relids)
485  }
486  else
488 }
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1421
#define lfirst_oid(lc)
Definition: pg_list.h:171
#define MAX_RELCACHE_INVAL_MSGS

References CacheInvalidateRelcacheAll(), CacheInvalidateRelcacheByRelid(), lfirst_oid, list_length(), and MAX_RELCACHE_INVAL_MSGS.

Referenced by AlterPublicationOptions(), publication_add_relation(), publication_add_schema(), RemovePublicationRelById(), and RemovePublicationSchemaById().

◆ RemovePublicationById()

void RemovePublicationById ( Oid  pubid)

Definition at line 806 of file publicationcmds.c.

807 {
808  Relation rel;
809  HeapTuple tup;
810  Form_pg_publication pubform;
811 
812  rel = table_open(PublicationRelationId, RowExclusiveLock);
813 
815  if (!HeapTupleIsValid(tup))
816  elog(ERROR, "cache lookup failed for publication %u", pubid);
817 
818  pubform = (Form_pg_publication) GETSTRUCT(tup);
819 
820  /* Invalidate relcache so that publication info is rebuilt. */
821  if (pubform->puballtables)
823 
824  CatalogTupleDelete(rel, &tup->t_self);
825 
826  ReleaseSysCache(tup);
827 
829 }
#define elog(elevel,...)
Definition: elog.h:218
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
ItemPointerData t_self
Definition: htup.h:65
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1198
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1150

References CacheInvalidateRelcacheAll(), CatalogTupleDelete(), elog, ERROR, GETSTRUCT, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

◆ RemovePublicationRelById()

void RemovePublicationRelById ( Oid  proid)

Definition at line 765 of file publicationcmds.c.

766 {
767  Relation rel;
768  HeapTuple tup;
770  List *relids = NIL;
771 
772  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
773 
775 
776  if (!HeapTupleIsValid(tup))
777  elog(ERROR, "cache lookup failed for publication table %u",
778  proid);
779 
780  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
781 
782  /*
783  * Invalidate relcache so that publication info is rebuilt.
784  *
785  * For the partitioned tables, we must invalidate all partitions contained
786  * in the respective partition hierarchies, not just the one explicitly
787  * mentioned in the publication. This is required because we implicitly
788  * publish the child tables when the parent table is published.
789  */
791  pubrel->prrelid);
792 
794 
795  CatalogTupleDelete(rel, &tup->t_self);
796 
797  ReleaseSysCache(tup);
798 
800 }
List * GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid)
@ PUBLICATION_PART_ALL
FormData_pg_publication_rel * Form_pg_publication_rel
void InvalidatePublicationRels(List *relids)
@ PUBLICATIONREL
Definition: syscache.h:82

References CatalogTupleDelete(), elog, ERROR, GetPubPartitionOptionRelations(), GETSTRUCT, HeapTupleIsValid, InvalidatePublicationRels(), NIL, ObjectIdGetDatum, PUBLICATION_PART_ALL, PUBLICATIONREL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

◆ RemovePublicationSchemaById()

void RemovePublicationSchemaById ( Oid  psoid)

Definition at line 835 of file publicationcmds.c.

836 {
837  Relation rel;
838  HeapTuple tup;
839  List *schemaRels = NIL;
841 
842  rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
843 
845 
846  if (!HeapTupleIsValid(tup))
847  elog(ERROR, "cache lookup failed for publication schema %u", psoid);
848 
850 
851  /*
852  * Invalidate relcache so that publication info is rebuilt. See
853  * RemovePublicationRelById for why we need to consider all the
854  * partitions.
855  */
856  schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
858  InvalidatePublicationRels(schemaRels);
859 
860  CatalogTupleDelete(rel, &tup->t_self);
861 
862  ReleaseSysCache(tup);
863 
865 }
List * GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
FormData_pg_publication_namespace * Form_pg_publication_namespace
@ PUBLICATIONNAMESPACE
Definition: syscache.h:79

References CatalogTupleDelete(), elog, ERROR, GetSchemaPublicationRelations(), GETSTRUCT, HeapTupleIsValid, InvalidatePublicationRels(), NIL, ObjectIdGetDatum, PUBLICATION_PART_ALL, PUBLICATIONNAMESPACE, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().