PostgreSQL Source Code  git master
publicationcmds.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for publicationcmds.c:

Go to the source code of this file.

Macros

#define MAX_RELCACHE_INVAL_MSGS   4096
 

Functions

static ListOpenTableList (List *tables)
 
static void CloseTableList (List *rels)
 
static void PublicationAddTables (Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
 
static void PublicationDropTables (Oid pubid, List *rels, bool missing_ok)
 
static void parse_publication_options (ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
 
ObjectAddress CreatePublication (ParseState *pstate, CreatePublicationStmt *stmt)
 
static void AlterPublicationOptions (ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
static void AlterPublicationTables (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
void AlterPublication (ParseState *pstate, AlterPublicationStmt *stmt)
 
void RemovePublicationRelById (Oid proid)
 
static void AlterPublicationOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterPublicationOwner (const char *name, Oid newOwnerId)
 
void AlterPublicationOwner_oid (Oid subid, Oid newOwnerId)
 

Macro Definition Documentation

◆ MAX_RELCACHE_INVAL_MSGS

#define MAX_RELCACHE_INVAL_MSGS   4096

Definition at line 49 of file publicationcmds.c.

Referenced by AlterPublicationOptions().

Function Documentation

◆ AlterPublication()

void AlterPublication ( ParseState pstate,
AlterPublicationStmt stmt 
)

Definition at line 436 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, AlterPublicationOptions(), AlterPublicationTables(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, GetUserId(), heap_freetuple(), HeapTupleIsValid, OBJECT_PUBLICATION, AlterPublicationStmt::options, pg_publication_ownercheck(), PUBLICATIONNAME, AlterPublicationStmt::pubname, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ProcessUtilitySlow().

437 {
438  Relation rel;
439  HeapTuple tup;
440  Form_pg_publication pubform;
441 
442  rel = table_open(PublicationRelationId, RowExclusiveLock);
443 
445  CStringGetDatum(stmt->pubname));
446 
447  if (!HeapTupleIsValid(tup))
448  ereport(ERROR,
449  (errcode(ERRCODE_UNDEFINED_OBJECT),
450  errmsg("publication \"%s\" does not exist",
451  stmt->pubname)));
452 
453  pubform = (Form_pg_publication) GETSTRUCT(tup);
454 
455  /* must be owner */
456  if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
458  stmt->pubname);
459 
460  if (stmt->options)
461  AlterPublicationOptions(pstate, stmt, rel, tup);
462  else
463  AlterPublicationTables(stmt, rel, tup);
464 
465  /* Cleanup. */
466  heap_freetuple(tup);
468 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
Oid GetUserId(void)
Definition: miscinit.c:478
static void AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5356
int errcode(int sqlerrcode)
Definition: elog.c:698
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3308
#define ERROR
Definition: elog.h:46
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:622
static void AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
#define ereport(elevel,...)
Definition: elog.h:157
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:175
int errmsg(const char *fmt,...)
Definition: elog.c:909
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOptions()

static void AlterPublicationOptions ( ParseState pstate,
AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 257 of file publicationcmds.c.

References BoolGetDatum, CacheInvalidateRelcacheAll(), CacheInvalidateRelcacheByRelid(), CatalogTupleUpdate(), CommandCounterIncrement(), EventTriggerCollectSimpleCommand(), GetPublicationRelations(), GETSTRUCT, heap_modify_tuple(), InvalidObjectAddress, InvokeObjectPostAlterHook, lfirst_oid, list_length(), MAX_RELCACHE_INVAL_MSGS, ObjectAddressSet, AlterPublicationStmt::options, parse_publication_options(), PublicationActions::pubdelete, PublicationActions::pubinsert, PUBLICATION_PART_ALL, PublicationActions::pubtruncate, PublicationActions::pubupdate, RelationGetDescr, HeapTupleData::t_self, and values.

Referenced by AlterPublication().

259 {
260  bool nulls[Natts_pg_publication];
261  bool replaces[Natts_pg_publication];
262  Datum values[Natts_pg_publication];
263  bool publish_given;
264  PublicationActions pubactions;
265  bool publish_via_partition_root_given;
266  bool publish_via_partition_root;
267  ObjectAddress obj;
268  Form_pg_publication pubform;
269 
271  stmt->options,
272  &publish_given, &pubactions,
273  &publish_via_partition_root_given,
274  &publish_via_partition_root);
275 
276  /* Everything ok, form a new tuple. */
277  memset(values, 0, sizeof(values));
278  memset(nulls, false, sizeof(nulls));
279  memset(replaces, false, sizeof(replaces));
280 
281  if (publish_given)
282  {
283  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
284  replaces[Anum_pg_publication_pubinsert - 1] = true;
285 
286  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
287  replaces[Anum_pg_publication_pubupdate - 1] = true;
288 
289  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
290  replaces[Anum_pg_publication_pubdelete - 1] = true;
291 
292  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
293  replaces[Anum_pg_publication_pubtruncate - 1] = true;
294  }
295 
296  if (publish_via_partition_root_given)
297  {
298  values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
299  replaces[Anum_pg_publication_pubviaroot - 1] = true;
300  }
301 
302  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
303  replaces);
304 
305  /* Update the catalog. */
306  CatalogTupleUpdate(rel, &tup->t_self, tup);
307 
309 
310  pubform = (Form_pg_publication) GETSTRUCT(tup);
311 
312  /* Invalidate the relcache. */
313  if (pubform->puballtables)
314  {
316  }
317  else
318  {
319  /*
320  * For any partitioned tables contained in the publication, we must
321  * invalidate all partitions contained in the respective partition
322  * trees, not just those explicitly mentioned in the publication.
323  */
324  List *relids = GetPublicationRelations(pubform->oid,
326 
327  /*
328  * We don't want to send too many individual messages, at some point
329  * it's cheaper to just reset whole relcache.
330  */
331  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
332  {
333  ListCell *lc;
334 
335  foreach(lc, relids)
336  {
337  Oid relid = lfirst_oid(lc);
338 
340  }
341  }
342  else
344  }
345 
346  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
348  (Node *) stmt);
349 
350  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
351 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
#define RelationGetDescr(relation)
Definition: rel.h:503
Definition: nodes.h:539
unsigned int Oid
Definition: postgres_ext.h:31
#define MAX_RELCACHE_INVAL_MSGS
ItemPointerData t_self
Definition: htup.h:65
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1338
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1303
uintptr_t Datum
Definition: postgres.h:411
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define BoolGetDatum(X)
Definition: postgres.h:446
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
static int list_length(const List *l)
Definition: pg_list.h:149
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:166
const ObjectAddress InvalidObjectAddress
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
FormData_pg_publication * Form_pg_publication
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
Definition: pg_list.h:50
#define lfirst_oid(lc)
Definition: pg_list.h:171

◆ AlterPublicationOwner()

ObjectAddress AlterPublicationOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 723 of file publicationcmds.c.

References AlterPublicationOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, ObjectAddressSet, PUBLICATIONNAME, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

724 {
725  Oid subid;
726  HeapTuple tup;
727  Relation rel;
728  ObjectAddress address;
729  Form_pg_publication pubform;
730 
731  rel = table_open(PublicationRelationId, RowExclusiveLock);
732 
734 
735  if (!HeapTupleIsValid(tup))
736  ereport(ERROR,
737  (errcode(ERRCODE_UNDEFINED_OBJECT),
738  errmsg("publication \"%s\" does not exist", name)));
739 
740  pubform = (Form_pg_publication) GETSTRUCT(tup);
741  subid = pubform->oid;
742 
743  AlterPublicationOwner_internal(rel, tup, newOwnerId);
744 
745  ObjectAddressSet(address, PublicationRelationId, subid);
746 
747  heap_freetuple(tup);
748 
750 
751  return address;
752 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
int errcode(int sqlerrcode)
Definition: elog.c:698
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:46
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:622
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define ereport(elevel,...)
Definition: elog.h:157
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:515
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:175
int errmsg(const char *fmt,...)
Definition: elog.c:909
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOwner_internal()

static void AlterPublicationOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 672 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, CatalogTupleUpdate(), changeDependencyOnOwner(), check_is_member_of_role(), ereport, errcode(), errhint(), errmsg(), ERROR, get_database_name(), GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, MyDatabaseId, NameStr, OBJECT_DATABASE, OBJECT_PUBLICATION, pg_database_aclcheck(), pg_publication_ownercheck(), superuser(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterPublicationOwner(), and AlterPublicationOwner_oid().

673 {
674  Form_pg_publication form;
675 
676  form = (Form_pg_publication) GETSTRUCT(tup);
677 
678  if (form->pubowner == newOwnerId)
679  return;
680 
681  if (!superuser())
682  {
683  AclResult aclresult;
684 
685  /* Must be owner */
686  if (!pg_publication_ownercheck(form->oid, GetUserId()))
688  NameStr(form->pubname));
689 
690  /* Must be able to become new owner */
691  check_is_member_of_role(GetUserId(), newOwnerId);
692 
693  /* New owner must have CREATE privilege on database */
694  aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
695  if (aclresult != ACLCHECK_OK)
696  aclcheck_error(aclresult, OBJECT_DATABASE,
698 
699  if (form->puballtables && !superuser_arg(newOwnerId))
700  ereport(ERROR,
701  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
702  errmsg("permission denied to change owner of publication \"%s\"",
703  NameStr(form->pubname)),
704  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
705  }
706 
707  form->pubowner = newOwnerId;
708  CatalogTupleUpdate(rel, &tup->t_self, tup);
709 
710  /* Update owner dependency reference */
711  changeDependencyOnOwner(PublicationRelationId,
712  form->oid,
713  newOwnerId);
714 
715  InvokeObjectPostAlterHook(PublicationRelationId,
716  form->oid, 0);
717 }
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
Oid GetUserId(void)
Definition: miscinit.c:478
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5356
int errcode(int sqlerrcode)
Definition: elog.c:698
bool superuser(void)
Definition: superuser.c:46
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:311
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3308
#define ERROR
Definition: elog.h:46
#define ACL_CREATE
Definition: parsenodes.h:92
ItemPointerData t_self
Definition: htup.h:65
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2113
void check_is_member_of_role(Oid member, Oid role)
Definition: acl.c:4893
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
AclResult
Definition: acl.h:177
Oid MyDatabaseId
Definition: globals.c:88
#define ereport(elevel,...)
Definition: elog.h:157
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4706
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define NameStr(name)
Definition: c.h:681
FormData_pg_publication * Form_pg_publication

◆ AlterPublicationOwner_oid()

void AlterPublicationOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 758 of file publicationcmds.c.

References AlterPublicationOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned().

759 {
760  HeapTuple tup;
761  Relation rel;
762 
763  rel = table_open(PublicationRelationId, RowExclusiveLock);
764 
766 
767  if (!HeapTupleIsValid(tup))
768  ereport(ERROR,
769  (errcode(ERRCODE_UNDEFINED_OBJECT),
770  errmsg("publication with OID %u does not exist", subid)));
771 
772  AlterPublicationOwner_internal(rel, tup, newOwnerId);
773 
774  heap_freetuple(tup);
775 
777 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errcode(int sqlerrcode)
Definition: elog.c:698
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
#define RowExclusiveLock
Definition: lockdefs.h:38
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define ereport(elevel,...)
Definition: elog.h:157
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:175
int errmsg(const char *fmt,...)
Definition: elog.c:909
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationTables()

static void AlterPublicationTables ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 357 of file publicationcmds.c.

References Assert, CloseTableList(), DEFELEM_ADD, DEFELEM_DROP, ereport, errcode(), errdetail(), errmsg(), ERROR, GetPublicationRelations(), GETSTRUCT, lappend(), lfirst, lfirst_oid, list_length(), NameStr, NIL, OpenTableList(), PUBLICATION_PART_ROOT, PublicationAddTables(), PublicationDropTables(), RelationGetRelid, ShareUpdateExclusiveLock, table_open(), AlterPublicationStmt::tableAction, and AlterPublicationStmt::tables.

Referenced by AlterPublication().

359 {
360  List *rels = NIL;
362  Oid pubid = pubform->oid;
363 
364  /* Check that user is allowed to manipulate the publication tables. */
365  if (pubform->puballtables)
366  ereport(ERROR,
367  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
368  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
369  NameStr(pubform->pubname)),
370  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
371 
372  Assert(list_length(stmt->tables) > 0);
373 
374  rels = OpenTableList(stmt->tables);
375 
376  if (stmt->tableAction == DEFELEM_ADD)
377  PublicationAddTables(pubid, rels, false, stmt);
378  else if (stmt->tableAction == DEFELEM_DROP)
379  PublicationDropTables(pubid, rels, false);
380  else /* DEFELEM_SET */
381  {
382  List *oldrelids = GetPublicationRelations(pubid,
384  List *delrels = NIL;
385  ListCell *oldlc;
386 
387  /* Calculate which relations to drop. */
388  foreach(oldlc, oldrelids)
389  {
390  Oid oldrelid = lfirst_oid(oldlc);
391  ListCell *newlc;
392  bool found = false;
393 
394  foreach(newlc, rels)
395  {
396  Relation newrel = (Relation) lfirst(newlc);
397 
398  if (RelationGetRelid(newrel) == oldrelid)
399  {
400  found = true;
401  break;
402  }
403  }
404 
405  if (!found)
406  {
407  Relation oldrel = table_open(oldrelid,
409 
410  delrels = lappend(delrels, oldrel);
411  }
412  }
413 
414  /* And drop them. */
415  PublicationDropTables(pubid, delrels, true);
416 
417  /*
418  * Don't bother calculating the difference for adding, we'll catch and
419  * skip existing ones when doing catalog update.
420  */
421  PublicationAddTables(pubid, rels, true, stmt);
422 
423  CloseTableList(delrels);
424  }
425 
426  CloseTableList(rels);
427 }
#define NIL
Definition: pg_list.h:65
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
int errcode(int sqlerrcode)
Definition: elog.c:698
unsigned int Oid
Definition: postgres_ext.h:31
static void CloseTableList(List *rels)
struct RelationData * Relation
Definition: relcache.h:27
#define ERROR
Definition: elog.h:46
int errdetail(const char *fmt,...)
Definition: elog.c:1042
List * lappend(List *list, void *datum)
Definition: list.c:336
DefElemAction tableAction
Definition: parsenodes.h:3647
#define ereport(elevel,...)
Definition: elog.h:157
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
static int list_length(const List *l)
Definition: pg_list.h:149
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define NameStr(name)
Definition: c.h:681
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:477
static List * OpenTableList(List *tables)
#define lfirst_oid(lc)
Definition: pg_list.h:171

◆ CloseTableList()

static void CloseTableList ( List rels)
static

Definition at line 589 of file publicationcmds.c.

References lfirst, NoLock, and table_close().

Referenced by AlterPublicationTables(), and CreatePublication().

590 {
591  ListCell *lc;
592 
593  foreach(lc, rels)
594  {
595  Relation rel = (Relation) lfirst(lc);
596 
597  table_close(rel, NoLock);
598  }
599 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
struct RelationData * Relation
Definition: relcache.h:27
#define NoLock
Definition: lockdefs.h:34
#define lfirst(lc)
Definition: pg_list.h:169

◆ CreatePublication()

ObjectAddress CreatePublication ( ParseState pstate,
CreatePublicationStmt stmt 
)

Definition at line 145 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_OK, Assert, BoolGetDatum, CatalogTupleInsert(), CloseTableList(), CommandCounterIncrement(), CStringGetDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, CreatePublicationStmt::for_all_tables, get_database_name(), GetNewOidWithIndex(), GetSysCacheOid1, GetUserId(), heap_form_tuple(), heap_freetuple(), InvokeObjectPostCreateHook, list_length(), MyDatabaseId, namein(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, OpenTableList(), CreatePublicationStmt::options, parse_publication_options(), pg_database_aclcheck(), PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationAddTables(), PUBLICATIONNAME, CreatePublicationStmt::pubname, PublicationActions::pubtruncate, PublicationActions::pubupdate, recordDependencyOnOwner(), RelationGetDescr, RowExclusiveLock, superuser(), table_close(), table_open(), CreatePublicationStmt::tables, values, wal_level, WAL_LEVEL_LOGICAL, and WARNING.

Referenced by ProcessUtilitySlow().

146 {
147  Relation rel;
148  ObjectAddress myself;
149  Oid puboid;
150  bool nulls[Natts_pg_publication];
151  Datum values[Natts_pg_publication];
152  HeapTuple tup;
153  bool publish_given;
154  PublicationActions pubactions;
155  bool publish_via_partition_root_given;
156  bool publish_via_partition_root;
157  AclResult aclresult;
158 
159  /* must have CREATE privilege on database */
161  if (aclresult != ACLCHECK_OK)
162  aclcheck_error(aclresult, OBJECT_DATABASE,
164 
165  /* FOR ALL TABLES requires superuser */
166  if (stmt->for_all_tables && !superuser())
167  ereport(ERROR,
168  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
169  errmsg("must be superuser to create FOR ALL TABLES publication")));
170 
171  rel = table_open(PublicationRelationId, RowExclusiveLock);
172 
173  /* Check if name is used */
174  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
175  CStringGetDatum(stmt->pubname));
176  if (OidIsValid(puboid))
177  {
178  ereport(ERROR,
180  errmsg("publication \"%s\" already exists",
181  stmt->pubname)));
182  }
183 
184  /* Form a tuple. */
185  memset(values, 0, sizeof(values));
186  memset(nulls, false, sizeof(nulls));
187 
188  values[Anum_pg_publication_pubname - 1] =
190  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
191 
193  stmt->options,
194  &publish_given, &pubactions,
195  &publish_via_partition_root_given,
196  &publish_via_partition_root);
197 
198  puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
199  Anum_pg_publication_oid);
200  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
201  values[Anum_pg_publication_puballtables - 1] =
203  values[Anum_pg_publication_pubinsert - 1] =
204  BoolGetDatum(pubactions.pubinsert);
205  values[Anum_pg_publication_pubupdate - 1] =
206  BoolGetDatum(pubactions.pubupdate);
207  values[Anum_pg_publication_pubdelete - 1] =
208  BoolGetDatum(pubactions.pubdelete);
209  values[Anum_pg_publication_pubtruncate - 1] =
210  BoolGetDatum(pubactions.pubtruncate);
211  values[Anum_pg_publication_pubviaroot - 1] =
212  BoolGetDatum(publish_via_partition_root);
213 
214  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
215 
216  /* Insert tuple into catalog. */
217  CatalogTupleInsert(rel, tup);
218  heap_freetuple(tup);
219 
220  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
221 
222  ObjectAddressSet(myself, PublicationRelationId, puboid);
223 
224  /* Make the changes visible. */
226 
227  if (stmt->tables)
228  {
229  List *rels;
230 
231  Assert(list_length(stmt->tables) > 0);
232 
233  rels = OpenTableList(stmt->tables);
234  PublicationAddTables(puboid, rels, true, NULL);
235  CloseTableList(rels);
236  }
237 
239 
240  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
241 
243  {
245  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
246  errmsg("wal_level is insufficient to publish logical changes"),
247  errhint("Set wal_level to logical before creating subscriptions.")));
248  }
249 
250  return myself;
251 }
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:381
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errhint(const char *fmt,...)
Definition: elog.c:1156
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:503
Oid GetUserId(void)
Definition: miscinit.c:478
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:193
int wal_level
Definition: xlog.c:108
int errcode(int sqlerrcode)
Definition: elog.c:698
bool superuser(void)
Definition: superuser.c:46
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:626
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:163
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:710
static void CloseTableList(List *rels)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3308
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
#define ACL_CREATE
Definition: parsenodes.h:92
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2113
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:622
static void parse_publication_options(ParseState *pstate, List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
#define WARNING
Definition: elog.h:40
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:411
void CommandCounterIncrement(void)
Definition: xact.c:1021
Oid MyDatabaseId
Definition: globals.c:88
#define BoolGetDatum(X)
Definition: postgres.h:446
#define ereport(elevel,...)
Definition: elog.h:157
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4706
#define Assert(condition)
Definition: c.h:804
static int list_length(const List *l)
Definition: pg_list.h:149
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:166
int errmsg(const char *fmt,...)
Definition: elog.c:909
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
Definition: pg_list.h:50
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
static List * OpenTableList(List *tables)

◆ OpenTableList()

static List * OpenTableList ( List tables)
static

Definition at line 506 of file publicationcmds.c.

References CHECK_FOR_INTERRUPTS, find_all_inheritors(), RangeVar::inh, lappend(), lappend_oid(), lfirst_node, lfirst_oid, list_free(), list_member_oid(), NIL, NoLock, RelationData::rd_rel, RelationGetRelid, ShareUpdateExclusiveLock, table_close(), table_open(), and table_openrv().

Referenced by AlterPublicationTables(), and CreatePublication().

507 {
508  List *relids = NIL;
509  List *rels = NIL;
510  ListCell *lc;
511 
512  /*
513  * Open, share-lock, and check all the explicitly-specified relations
514  */
515  foreach(lc, tables)
516  {
517  RangeVar *rv = lfirst_node(RangeVar, lc);
518  bool recurse = rv->inh;
519  Relation rel;
520  Oid myrelid;
521 
522  /* Allow query cancel in case this takes a long time */
524 
526  myrelid = RelationGetRelid(rel);
527 
528  /*
529  * Filter out duplicates if user specifies "foo, foo".
530  *
531  * Note that this algorithm is known to not be very efficient (O(N^2))
532  * but given that it only works on list of tables given to us by user
533  * it's deemed acceptable.
534  */
535  if (list_member_oid(relids, myrelid))
536  {
538  continue;
539  }
540 
541  rels = lappend(rels, rel);
542  relids = lappend_oid(relids, myrelid);
543 
544  /*
545  * Add children of this rel, if requested, so that they too are added
546  * to the publication. A partitioned table can't have any inheritance
547  * children other than its partitions, which need not be explicitly
548  * added to the publication.
549  */
550  if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
551  {
552  List *children;
553  ListCell *child;
554 
555  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
556  NULL);
557 
558  foreach(child, children)
559  {
560  Oid childrelid = lfirst_oid(child);
561 
562  /* Allow query cancel in case this takes a long time */
564 
565  /*
566  * Skip duplicates if user specified both parent and child
567  * tables.
568  */
569  if (list_member_oid(relids, childrelid))
570  continue;
571 
572  /* find_all_inheritors already got lock */
573  rel = table_open(childrelid, NoLock);
574  rels = lappend(rels, rel);
575  relids = lappend_oid(relids, childrelid);
576  }
577  }
578  }
579 
580  list_free(relids);
581 
582  return rels;
583 }
#define NIL
Definition: pg_list.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:372
#define lfirst_node(type, lc)
Definition: pg_list.h:172
#define NoLock
Definition: lockdefs.h:34
bool inh
Definition: primnodes.h:69
List * lappend(List *list, void *datum)
Definition: list.c:336
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:102
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:689
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:256
void list_free(List *list)
Definition: list.c:1391
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:120
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:477
#define lfirst_oid(lc)
Definition: pg_list.h:171

◆ parse_publication_options()

static void parse_publication_options ( ParseState pstate,
List options,
bool publish_given,
PublicationActions pubactions,
bool publish_via_partition_root_given,
bool publish_via_partition_root 
)
static

Definition at line 58 of file publicationcmds.c.

References defGetBoolean(), defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, errorConflictingDefElem(), lfirst, PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationActions::pubtruncate, PublicationActions::pubupdate, and SplitIdentifierString().

Referenced by AlterPublicationOptions(), and CreatePublication().

64 {
65  ListCell *lc;
66 
67  *publish_given = false;
68  *publish_via_partition_root_given = false;
69 
70  /* defaults */
71  pubactions->pubinsert = true;
72  pubactions->pubupdate = true;
73  pubactions->pubdelete = true;
74  pubactions->pubtruncate = true;
75  *publish_via_partition_root = false;
76 
77  /* Parse options */
78  foreach(lc, options)
79  {
80  DefElem *defel = (DefElem *) lfirst(lc);
81 
82  if (strcmp(defel->defname, "publish") == 0)
83  {
84  char *publish;
85  List *publish_list;
86  ListCell *lc;
87 
88  if (*publish_given)
89  errorConflictingDefElem(defel, pstate);
90 
91  /*
92  * If publish option was given only the explicitly listed actions
93  * should be published.
94  */
95  pubactions->pubinsert = false;
96  pubactions->pubupdate = false;
97  pubactions->pubdelete = false;
98  pubactions->pubtruncate = false;
99 
100  *publish_given = true;
101  publish = defGetString(defel);
102 
103  if (!SplitIdentifierString(publish, ',', &publish_list))
104  ereport(ERROR,
105  (errcode(ERRCODE_SYNTAX_ERROR),
106  errmsg("invalid list syntax for \"publish\" option")));
107 
108  /* Process the option list. */
109  foreach(lc, publish_list)
110  {
111  char *publish_opt = (char *) lfirst(lc);
112 
113  if (strcmp(publish_opt, "insert") == 0)
114  pubactions->pubinsert = true;
115  else if (strcmp(publish_opt, "update") == 0)
116  pubactions->pubupdate = true;
117  else if (strcmp(publish_opt, "delete") == 0)
118  pubactions->pubdelete = true;
119  else if (strcmp(publish_opt, "truncate") == 0)
120  pubactions->pubtruncate = true;
121  else
122  ereport(ERROR,
123  (errcode(ERRCODE_SYNTAX_ERROR),
124  errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
125  }
126  }
127  else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
128  {
129  if (*publish_via_partition_root_given)
130  errorConflictingDefElem(defel, pstate);
131  *publish_via_partition_root_given = true;
132  *publish_via_partition_root = defGetBoolean(defel);
133  }
134  else
135  ereport(ERROR,
136  (errcode(ERRCODE_SYNTAX_ERROR),
137  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
138  }
139 }
int errcode(int sqlerrcode)
Definition: elog.c:698
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:46
char * defGetString(DefElem *def)
Definition: define.c:49
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3753
#define ereport(elevel,...)
Definition: elog.h:157
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition: define.c:355
#define lfirst(lc)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:909
char * defname
Definition: parsenodes.h:746
Definition: pg_list.h:50

◆ PublicationAddTables()

static void PublicationAddTables ( Oid  pubid,
List rels,
bool  if_not_exists,
AlterPublicationStmt stmt 
)
static

Definition at line 605 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, Assert, EventTriggerCollectSimpleCommand(), AlterPublicationStmt::for_all_tables, get_relkind_objtype(), GetUserId(), InvalidObjectAddress, InvokeObjectPostCreateHook, lfirst, ObjectAddress::objectId, pg_class_ownercheck(), publication_add_relation(), RelationData::rd_rel, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables(), and CreatePublication().

607 {
608  ListCell *lc;
609 
610  Assert(!stmt || !stmt->for_all_tables);
611 
612  foreach(lc, rels)
613  {
614  Relation rel = (Relation) lfirst(lc);
615  ObjectAddress obj;
616 
617  /* Must be owner of the table or superuser. */
621 
622  obj = publication_add_relation(pubid, rel, if_not_exists);
623  if (stmt)
624  {
626  (Node *) stmt);
627 
628  InvokeObjectPostCreateHook(PublicationRelRelationId,
629  obj.objectId, 0);
630  }
631  }
632 }
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
Oid GetUserId(void)
Definition: miscinit.c:478
Definition: nodes.h:539
Form_pg_class rd_rel
Definition: rel.h:109
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3308
struct RelationData * Relation
Definition: relcache.h:27
#define RelationGetRelationName(relation)
Definition: rel.h:511
#define Assert(condition)
Definition: c.h:804
#define lfirst(lc)
Definition: pg_list.h:169
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4818
const ObjectAddress InvalidObjectAddress
ObjectType get_relkind_objtype(char relkind)
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ PublicationDropTables()

static void PublicationDropTables ( Oid  pubid,
List rels,
bool  missing_ok 
)
static

Definition at line 638 of file publicationcmds.c.

References DROP_CASCADE, ereport, errcode(), errmsg(), ERROR, GetSysCacheOid2, lfirst, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, performDeletion(), PUBLICATIONRELMAP, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables().

639 {
640  ObjectAddress obj;
641  ListCell *lc;
642  Oid prid;
643 
644  foreach(lc, rels)
645  {
646  Relation rel = (Relation) lfirst(lc);
647  Oid relid = RelationGetRelid(rel);
648 
649  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
650  ObjectIdGetDatum(relid),
651  ObjectIdGetDatum(pubid));
652  if (!OidIsValid(prid))
653  {
654  if (missing_ok)
655  continue;
656 
657  ereport(ERROR,
658  (errcode(ERRCODE_UNDEFINED_OBJECT),
659  errmsg("relation \"%s\" is not part of the publication",
660  RelationGetRelationName(rel))));
661  }
662 
663  ObjectAddressSet(obj, PublicationRelRelationId, prid);
664  performDeletion(&obj, DROP_CASCADE, 0);
665  }
666 }
int errcode(int sqlerrcode)
Definition: elog.c:698
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:710
struct RelationData * Relation
Definition: relcache.h:27
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:313
#define RelationGetRelationName(relation)
Definition: rel.h:511
#define ereport(elevel,...)
Definition: elog.h:157
#define lfirst(lc)
Definition: pg_list.h:169
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:195
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:909
#define RelationGetRelid(relation)
Definition: rel.h:477

◆ RemovePublicationRelById()

void RemovePublicationRelById ( Oid  proid)

Definition at line 474 of file publicationcmds.c.

References CacheInvalidateRelcacheByRelid(), CatalogTupleDelete(), elog, ERROR, GETSTRUCT, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONREL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

475 {
476  Relation rel;
477  HeapTuple tup;
479 
480  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
481 
483 
484  if (!HeapTupleIsValid(tup))
485  elog(ERROR, "cache lookup failed for publication table %u",
486  proid);
487 
488  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
489 
490  /* Invalidate relcache so that publication info is rebuilt. */
491  CacheInvalidateRelcacheByRelid(pubrel->prrelid);
492 
493  CatalogTupleDelete(rel, &tup->t_self);
494 
495  ReleaseSysCache(tup);
496 
498 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define GETSTRUCT(TUP)
Definition: htup_details.h:654
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
#define ObjectIdGetDatum(X)
Definition: postgres.h:551
#define ERROR
Definition: elog.h:46
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1338
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1127
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1175
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:232
FormData_pg_publication_rel * Form_pg_publication_rel
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39