PostgreSQL Source Code  git master
publicationcmds.c File Reference
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_type.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/publicationcmds.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for publicationcmds.c:

Go to the source code of this file.

Macros

#define MAX_RELCACHE_INVAL_MSGS   4096
 

Functions

static ListOpenTableList (List *tables)
 
static void CloseTableList (List *rels)
 
static void PublicationAddTables (Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
 
static void PublicationDropTables (Oid pubid, List *rels, bool missing_ok)
 
static void parse_publication_options (List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
 
ObjectAddress CreatePublication (CreatePublicationStmt *stmt)
 
static void AlterPublicationOptions (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
static void AlterPublicationTables (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
void AlterPublication (AlterPublicationStmt *stmt)
 
void RemovePublicationById (Oid pubid)
 
void RemovePublicationRelById (Oid proid)
 
static void AlterPublicationOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterPublicationOwner (const char *name, Oid newOwnerId)
 
void AlterPublicationOwner_oid (Oid subid, Oid newOwnerId)
 

Macro Definition Documentation

◆ MAX_RELCACHE_INVAL_MSGS

#define MAX_RELCACHE_INVAL_MSGS   4096

Definition at line 51 of file publicationcmds.c.

Referenced by AlterPublicationOptions().

Function Documentation

◆ AlterPublication()

void AlterPublication ( AlterPublicationStmt stmt)

Definition at line 416 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, AlterPublicationOptions(), AlterPublicationTables(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, GetUserId(), heap_freetuple(), HeapTupleIsValid, OBJECT_PUBLICATION, AlterPublicationStmt::options, pg_publication_ownercheck(), PUBLICATIONNAME, AlterPublicationStmt::pubname, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ProcessUtilitySlow().

417 {
418  Relation rel;
419  HeapTuple tup;
420  Form_pg_publication pubform;
421 
422  rel = table_open(PublicationRelationId, RowExclusiveLock);
423 
425  CStringGetDatum(stmt->pubname));
426 
427  if (!HeapTupleIsValid(tup))
428  ereport(ERROR,
429  (errcode(ERRCODE_UNDEFINED_OBJECT),
430  errmsg("publication \"%s\" does not exist",
431  stmt->pubname)));
432 
433  pubform = (Form_pg_publication) GETSTRUCT(tup);
434 
435  /* must be owner */
436  if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
438  stmt->pubname);
439 
440  if (stmt->options)
441  AlterPublicationOptions(stmt, rel, tup);
442  else
443  AlterPublicationTables(stmt, rel, tup);
444 
445  /* Cleanup. */
446  heap_freetuple(tup);
448 }
static void AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:380
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5293
int errcode(int sqlerrcode)
Definition: elog.c:570
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3353
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:784
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOptions()

static void AlterPublicationOptions ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 250 of file publicationcmds.c.

References BoolGetDatum, CacheInvalidateRelcacheAll(), CacheInvalidateRelcacheByRelid(), CatalogTupleUpdate(), CommandCounterIncrement(), EventTriggerCollectSimpleCommand(), GetPublicationRelations(), GETSTRUCT, heap_modify_tuple(), InvalidObjectAddress, InvokeObjectPostAlterHook, lfirst_oid, list_length(), MAX_RELCACHE_INVAL_MSGS, ObjectAddressSet, AlterPublicationStmt::options, parse_publication_options(), RelationGetDescr, HeapTupleData::t_self, and values.

Referenced by AlterPublication().

252 {
253  bool nulls[Natts_pg_publication];
254  bool replaces[Natts_pg_publication];
255  Datum values[Natts_pg_publication];
256  bool publish_given;
257  bool publish_insert;
258  bool publish_update;
259  bool publish_delete;
260  bool publish_truncate;
261  ObjectAddress obj;
262  Form_pg_publication pubform;
263 
265  &publish_given, &publish_insert,
266  &publish_update, &publish_delete,
267  &publish_truncate);
268 
269  /* Everything ok, form a new tuple. */
270  memset(values, 0, sizeof(values));
271  memset(nulls, false, sizeof(nulls));
272  memset(replaces, false, sizeof(replaces));
273 
274  if (publish_given)
275  {
276  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
277  replaces[Anum_pg_publication_pubinsert - 1] = true;
278 
279  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
280  replaces[Anum_pg_publication_pubupdate - 1] = true;
281 
282  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
283  replaces[Anum_pg_publication_pubdelete - 1] = true;
284 
285  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
286  replaces[Anum_pg_publication_pubtruncate - 1] = true;
287  }
288 
289  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
290  replaces);
291 
292  /* Update the catalog. */
293  CatalogTupleUpdate(rel, &tup->t_self, tup);
294 
296 
297  pubform = (Form_pg_publication) GETSTRUCT(tup);
298 
299  /* Invalidate the relcache. */
300  if (pubform->puballtables)
301  {
303  }
304  else
305  {
306  List *relids = GetPublicationRelations(pubform->oid);
307 
308  /*
309  * We don't want to send too many individual messages, at some point
310  * it's cheaper to just reset whole relcache.
311  */
312  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
313  {
314  ListCell *lc;
315 
316  foreach(lc, relids)
317  {
318  Oid relid = lfirst_oid(lc);
319 
321  }
322  }
323  else
325  }
326 
327  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
329  (Node *) stmt);
330 
331  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
332 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:445
Definition: nodes.h:525
unsigned int Oid
Definition: postgres_ext.h:31
#define MAX_RELCACHE_INVAL_MSGS
ItemPointerData t_self
Definition: htup.h:65
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1329
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1294
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1003
#define BoolGetDatum(X)
Definition: postgres.h:402
List * GetPublicationRelations(Oid pubid)
static void parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
const ObjectAddress InvalidObjectAddress
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
FormData_pg_publication * Form_pg_publication
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
Definition: pg_list.h:50
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ AlterPublicationOwner()

ObjectAddress AlterPublicationOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 720 of file publicationcmds.c.

References AlterPublicationOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, ObjectAddressSet, PUBLICATIONNAME, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

721 {
722  Oid subid;
723  HeapTuple tup;
724  Relation rel;
725  ObjectAddress address;
726  Form_pg_publication pubform;
727 
728  rel = table_open(PublicationRelationId, RowExclusiveLock);
729 
731 
732  if (!HeapTupleIsValid(tup))
733  ereport(ERROR,
734  (errcode(ERRCODE_UNDEFINED_OBJECT),
735  errmsg("publication \"%s\" does not exist", name)));
736 
737  pubform = (Form_pg_publication) GETSTRUCT(tup);
738  subid = pubform->oid;
739 
740  AlterPublicationOwner_internal(rel, tup, newOwnerId);
741 
742  ObjectAddressSet(address, PublicationRelationId, subid);
743 
744  heap_freetuple(tup);
745 
747 
748  return address;
749 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:570
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:784
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOwner_internal()

static void AlterPublicationOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 669 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, CatalogTupleUpdate(), changeDependencyOnOwner(), check_is_member_of_role(), ereport, errcode(), errhint(), errmsg(), ERROR, get_database_name(), GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, MyDatabaseId, NameStr, OBJECT_DATABASE, OBJECT_PUBLICATION, pg_database_aclcheck(), pg_publication_ownercheck(), superuser(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterPublicationOwner(), and AlterPublicationOwner_oid().

670 {
671  Form_pg_publication form;
672 
673  form = (Form_pg_publication) GETSTRUCT(tup);
674 
675  if (form->pubowner == newOwnerId)
676  return;
677 
678  if (!superuser())
679  {
680  AclResult aclresult;
681 
682  /* Must be owner */
683  if (!pg_publication_ownercheck(form->oid, GetUserId()))
685  NameStr(form->pubname));
686 
687  /* Must be able to become new owner */
688  check_is_member_of_role(GetUserId(), newOwnerId);
689 
690  /* New owner must have CREATE privilege on database */
691  aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
692  if (aclresult != ACLCHECK_OK)
693  aclcheck_error(aclresult, OBJECT_DATABASE,
695 
696  if (form->puballtables && !superuser_arg(newOwnerId))
697  ereport(ERROR,
698  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
699  errmsg("permission denied to change owner of publication \"%s\"",
700  NameStr(form->pubname)),
701  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
702  }
703 
704  form->pubowner = newOwnerId;
705  CatalogTupleUpdate(rel, &tup->t_self, tup);
706 
707  /* Update owner dependency reference */
708  changeDependencyOnOwner(PublicationRelationId,
709  form->oid,
710  newOwnerId);
711 
712  InvokeObjectPostAlterHook(PublicationRelationId,
713  form->oid, 0);
714 }
int errhint(const char *fmt,...)
Definition: elog.c:974
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:380
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5293
int errcode(int sqlerrcode)
Definition: elog.c:570
bool superuser(void)
Definition: superuser.c:47
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:310
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3353
#define ERROR
Definition: elog.h:43
#define ACL_CREATE
Definition: parsenodes.h:84
ItemPointerData t_self
Definition: htup.h:65
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2100
void check_is_member_of_role(Oid member, Oid role)
Definition: acl.c:4954
#define ereport(elevel, rest)
Definition: elog.h:141
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
bool superuser_arg(Oid roleid)
Definition: superuser.c:57
AclResult
Definition: acl.h:177
Oid MyDatabaseId
Definition: globals.c:85
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4643
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define NameStr(name)
Definition: c.h:609
FormData_pg_publication * Form_pg_publication

◆ AlterPublicationOwner_oid()

void AlterPublicationOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 755 of file publicationcmds.c.

References AlterPublicationOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned().

756 {
757  HeapTuple tup;
758  Relation rel;
759 
760  rel = table_open(PublicationRelationId, RowExclusiveLock);
761 
763 
764  if (!HeapTupleIsValid(tup))
765  ereport(ERROR,
766  (errcode(ERRCODE_UNDEFINED_OBJECT),
767  errmsg("publication with OID %u does not exist", subid)));
768 
769  AlterPublicationOwner_internal(rel, tup, newOwnerId);
770 
771  heap_freetuple(tup);
772 
774 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errcode(int sqlerrcode)
Definition: elog.c:570
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:784
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationTables()

static void AlterPublicationTables ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 338 of file publicationcmds.c.

References Assert, CloseTableList(), DEFELEM_ADD, DEFELEM_DROP, ereport, errcode(), errdetail(), errmsg(), ERROR, GetPublicationRelations(), GETSTRUCT, lappend(), lfirst, lfirst_oid, list_length(), NameStr, NIL, OpenTableList(), PublicationAddTables(), PublicationDropTables(), RelationGetRelid, ShareUpdateExclusiveLock, table_open(), AlterPublicationStmt::tableAction, and AlterPublicationStmt::tables.

Referenced by AlterPublication().

340 {
341  List *rels = NIL;
343  Oid pubid = pubform->oid;
344 
345  /* Check that user is allowed to manipulate the publication tables. */
346  if (pubform->puballtables)
347  ereport(ERROR,
348  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
349  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
350  NameStr(pubform->pubname)),
351  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
352 
353  Assert(list_length(stmt->tables) > 0);
354 
355  rels = OpenTableList(stmt->tables);
356 
357  if (stmt->tableAction == DEFELEM_ADD)
358  PublicationAddTables(pubid, rels, false, stmt);
359  else if (stmt->tableAction == DEFELEM_DROP)
360  PublicationDropTables(pubid, rels, false);
361  else /* DEFELEM_SET */
362  {
363  List *oldrelids = GetPublicationRelations(pubid);
364  List *delrels = NIL;
365  ListCell *oldlc;
366 
367  /* Calculate which relations to drop. */
368  foreach(oldlc, oldrelids)
369  {
370  Oid oldrelid = lfirst_oid(oldlc);
371  ListCell *newlc;
372  bool found = false;
373 
374  foreach(newlc, rels)
375  {
376  Relation newrel = (Relation) lfirst(newlc);
377 
378  if (RelationGetRelid(newrel) == oldrelid)
379  {
380  found = true;
381  break;
382  }
383  }
384 
385  if (!found)
386  {
387  Relation oldrel = table_open(oldrelid,
389 
390  delrels = lappend(delrels, oldrel);
391  }
392  }
393 
394  /* And drop them. */
395  PublicationDropTables(pubid, delrels, true);
396 
397  /*
398  * Don't bother calculating the difference for adding, we'll catch and
399  * skip existing ones when doing catalog update.
400  */
401  PublicationAddTables(pubid, rels, true, stmt);
402 
403  CloseTableList(delrels);
404  }
405 
406  CloseTableList(rels);
407 }
#define NIL
Definition: pg_list.h:65
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
int errcode(int sqlerrcode)
Definition: elog.c:570
unsigned int Oid
Definition: postgres_ext.h:31
static void CloseTableList(List *rels)
struct RelationData * Relation
Definition: relcache.h:26
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:860
#define ereport(elevel, rest)
Definition: elog.h:141
List * lappend(List *list, void *datum)
Definition: list.c:322
DefElemAction tableAction
Definition: parsenodes.h:3497
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
List * GetPublicationRelations(Oid pubid)
static int list_length(const List *l)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define NameStr(name)
Definition: c.h:609
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:419
static List * OpenTableList(List *tables)
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ CloseTableList()

static void CloseTableList ( List rels)
static

Definition at line 586 of file publicationcmds.c.

References lfirst, NoLock, and table_close().

Referenced by AlterPublicationTables(), and CreatePublication().

587 {
588  ListCell *lc;
589 
590  foreach(lc, rels)
591  {
592  Relation rel = (Relation) lfirst(lc);
593 
594  table_close(rel, NoLock);
595  }
596 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
struct RelationData * Relation
Definition: relcache.h:26
#define NoLock
Definition: lockdefs.h:34
#define lfirst(lc)
Definition: pg_list.h:190

◆ CreatePublication()

ObjectAddress CreatePublication ( CreatePublicationStmt stmt)

Definition at line 140 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_OK, Assert, BoolGetDatum, CatalogTupleInsert(), CloseTableList(), CommandCounterIncrement(), CStringGetDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, CreatePublicationStmt::for_all_tables, get_database_name(), GetNewOidWithIndex(), GetSysCacheOid1, GetUserId(), heap_form_tuple(), heap_freetuple(), InvokeObjectPostCreateHook, list_length(), MyDatabaseId, namein(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, OpenTableList(), CreatePublicationStmt::options, parse_publication_options(), pg_database_aclcheck(), PublicationAddTables(), PUBLICATIONNAME, PublicationObjectIndexId, CreatePublicationStmt::pubname, recordDependencyOnOwner(), RelationGetDescr, RowExclusiveLock, superuser(), table_close(), table_open(), CreatePublicationStmt::tables, values, wal_level, WAL_LEVEL_LOGICAL, and WARNING.

Referenced by ProcessUtilitySlow().

141 {
142  Relation rel;
143  ObjectAddress myself;
144  Oid puboid;
145  bool nulls[Natts_pg_publication];
146  Datum values[Natts_pg_publication];
147  HeapTuple tup;
148  bool publish_given;
149  bool publish_insert;
150  bool publish_update;
151  bool publish_delete;
152  bool publish_truncate;
153  AclResult aclresult;
154 
155  /* must have CREATE privilege on database */
157  if (aclresult != ACLCHECK_OK)
158  aclcheck_error(aclresult, OBJECT_DATABASE,
160 
161  /* FOR ALL TABLES requires superuser */
162  if (stmt->for_all_tables && !superuser())
163  ereport(ERROR,
164  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
165  (errmsg("must be superuser to create FOR ALL TABLES publication"))));
166 
167  rel = table_open(PublicationRelationId, RowExclusiveLock);
168 
169  /* Check if name is used */
170  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
171  CStringGetDatum(stmt->pubname));
172  if (OidIsValid(puboid))
173  {
174  ereport(ERROR,
176  errmsg("publication \"%s\" already exists",
177  stmt->pubname)));
178  }
179 
180  /* Form a tuple. */
181  memset(values, 0, sizeof(values));
182  memset(nulls, false, sizeof(nulls));
183 
184  values[Anum_pg_publication_pubname - 1] =
186  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
187 
189  &publish_given, &publish_insert,
190  &publish_update, &publish_delete,
191  &publish_truncate);
192 
194  Anum_pg_publication_oid);
195  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
196  values[Anum_pg_publication_puballtables - 1] =
198  values[Anum_pg_publication_pubinsert - 1] =
199  BoolGetDatum(publish_insert);
200  values[Anum_pg_publication_pubupdate - 1] =
201  BoolGetDatum(publish_update);
202  values[Anum_pg_publication_pubdelete - 1] =
203  BoolGetDatum(publish_delete);
204  values[Anum_pg_publication_pubtruncate - 1] =
205  BoolGetDatum(publish_truncate);
206 
207  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
208 
209  /* Insert tuple into catalog. */
210  CatalogTupleInsert(rel, tup);
211  heap_freetuple(tup);
212 
213  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
214 
215  ObjectAddressSet(myself, PublicationRelationId, puboid);
216 
217  /* Make the changes visible. */
219 
220  if (stmt->tables)
221  {
222  List *rels;
223 
224  Assert(list_length(stmt->tables) > 0);
225 
226  rels = OpenTableList(stmt->tables);
227  PublicationAddTables(puboid, rels, true, NULL);
228  CloseTableList(rels);
229  }
230 
232 
233  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
234 
236  {
238  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
239  errmsg("wal_level is insufficient to publish logical changes"),
240  errhint("Set wal_level to logical before creating subscriptions.")));
241  }
242 
243  return myself;
244 }
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:323
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:974
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
#define RelationGetDescr(relation)
Definition: rel.h:445
Oid GetUserId(void)
Definition: miscinit.c:380
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:192
int wal_level
Definition: xlog.c:103
int errcode(int sqlerrcode)
Definition: elog.c:570
bool superuser(void)
Definition: superuser.c:47
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:617
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:165
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:638
static void CloseTableList(List *rels)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3353
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PublicationObjectIndexId
Definition: indexing.h:349
#define ACL_CREATE
Definition: parsenodes.h:84
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2100
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1003
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4643
#define Assert(condition)
Definition: c.h:732
static void parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:33
Definition: pg_list.h:50
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
static List * OpenTableList(List *tables)

◆ OpenTableList()

static List * OpenTableList ( List tables)
static

Definition at line 508 of file publicationcmds.c.

References castNode, CHECK_FOR_INTERRUPTS, find_all_inheritors(), RangeVar::inh, lappend(), lappend_oid(), lfirst, lfirst_oid, list_free(), list_member_oid(), NIL, NoLock, RelationGetRelid, ShareUpdateExclusiveLock, table_close(), table_open(), and table_openrv().

Referenced by AlterPublicationTables(), and CreatePublication().

509 {
510  List *relids = NIL;
511  List *rels = NIL;
512  ListCell *lc;
513 
514  /*
515  * Open, share-lock, and check all the explicitly-specified relations
516  */
517  foreach(lc, tables)
518  {
519  RangeVar *rv = castNode(RangeVar, lfirst(lc));
520  bool recurse = rv->inh;
521  Relation rel;
522  Oid myrelid;
523 
524  /* Allow query cancel in case this takes a long time */
526 
528  myrelid = RelationGetRelid(rel);
529 
530  /*
531  * Filter out duplicates if user specifies "foo, foo".
532  *
533  * Note that this algorithm is known to not be very efficient (O(N^2))
534  * but given that it only works on list of tables given to us by user
535  * it's deemed acceptable.
536  */
537  if (list_member_oid(relids, myrelid))
538  {
540  continue;
541  }
542 
543  rels = lappend(rels, rel);
544  relids = lappend_oid(relids, myrelid);
545 
546  /* Add children of this rel, if requested */
547  if (recurse)
548  {
549  List *children;
550  ListCell *child;
551 
552  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
553  NULL);
554 
555  foreach(child, children)
556  {
557  Oid childrelid = lfirst_oid(child);
558 
559  /* Allow query cancel in case this takes a long time */
561 
562  /*
563  * Skip duplicates if user specified both parent and child
564  * tables.
565  */
566  if (list_member_oid(relids, childrelid))
567  continue;
568 
569  /* find_all_inheritors already got lock */
570  rel = table_open(childrelid, NoLock);
571  rels = lappend(rels, rel);
572  relids = lappend_oid(relids, childrelid);
573  }
574  }
575  }
576 
577  list_free(relids);
578 
579  return rels;
580 }
#define NIL
Definition: pg_list.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define castNode(_type_, nodeptr)
Definition: nodes.h:594
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
#define NoLock
Definition: lockdefs.h:34
bool inh
Definition: primnodes.h:69
List * lappend(List *list, void *datum)
Definition: list.c:322
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:68
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:675
#define lfirst(lc)
Definition: pg_list.h:190
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
void list_free(List *list)
Definition: list.c:1377
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:419
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ parse_publication_options()

static void parse_publication_options ( List options,
bool publish_given,
bool publish_insert,
bool publish_update,
bool publish_delete,
bool publish_truncate 
)
static

Definition at line 60 of file publicationcmds.c.

References defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, lfirst, and SplitIdentifierString().

Referenced by AlterPublicationOptions(), and CreatePublication().

66 {
67  ListCell *lc;
68 
69  *publish_given = false;
70 
71  /* Defaults are true */
72  *publish_insert = true;
73  *publish_update = true;
74  *publish_delete = true;
75  *publish_truncate = true;
76 
77  /* Parse options */
78  foreach(lc, options)
79  {
80  DefElem *defel = (DefElem *) lfirst(lc);
81 
82  if (strcmp(defel->defname, "publish") == 0)
83  {
84  char *publish;
85  List *publish_list;
86  ListCell *lc;
87 
88  if (*publish_given)
89  ereport(ERROR,
90  (errcode(ERRCODE_SYNTAX_ERROR),
91  errmsg("conflicting or redundant options")));
92 
93  /*
94  * If publish option was given only the explicitly listed actions
95  * should be published.
96  */
97  *publish_insert = false;
98  *publish_update = false;
99  *publish_delete = false;
100  *publish_truncate = false;
101 
102  *publish_given = true;
103  publish = defGetString(defel);
104 
105  if (!SplitIdentifierString(publish, ',', &publish_list))
106  ereport(ERROR,
107  (errcode(ERRCODE_SYNTAX_ERROR),
108  errmsg("invalid list syntax for \"publish\" option")));
109 
110  /* Process the option list. */
111  foreach(lc, publish_list)
112  {
113  char *publish_opt = (char *) lfirst(lc);
114 
115  if (strcmp(publish_opt, "insert") == 0)
116  *publish_insert = true;
117  else if (strcmp(publish_opt, "update") == 0)
118  *publish_update = true;
119  else if (strcmp(publish_opt, "delete") == 0)
120  *publish_delete = true;
121  else if (strcmp(publish_opt, "truncate") == 0)
122  *publish_truncate = true;
123  else
124  ereport(ERROR,
125  (errcode(ERRCODE_SYNTAX_ERROR),
126  errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
127  }
128  }
129  else
130  ereport(ERROR,
131  (errcode(ERRCODE_SYNTAX_ERROR),
132  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
133  }
134 }
int errcode(int sqlerrcode)
Definition: elog.c:570
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3644
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:784
char * defname
Definition: parsenodes.h:730
Definition: pg_list.h:50

◆ PublicationAddTables()

static void PublicationAddTables ( Oid  pubid,
List rels,
bool  if_not_exists,
AlterPublicationStmt stmt 
)
static

Definition at line 602 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, Assert, EventTriggerCollectSimpleCommand(), AlterPublicationStmt::for_all_tables, get_relkind_objtype(), GetUserId(), InvalidObjectAddress, InvokeObjectPostCreateHook, lfirst, ObjectAddress::objectId, pg_class_ownercheck(), publication_add_relation(), RelationData::rd_rel, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables(), and CreatePublication().

604 {
605  ListCell *lc;
606 
607  Assert(!stmt || !stmt->for_all_tables);
608 
609  foreach(lc, rels)
610  {
611  Relation rel = (Relation) lfirst(lc);
612  ObjectAddress obj;
613 
614  /* Must be owner of the table or superuser. */
618 
619  obj = publication_add_relation(pubid, rel, if_not_exists);
620  if (stmt)
621  {
623  (Node *) stmt);
624 
625  InvokeObjectPostCreateHook(PublicationRelRelationId,
626  obj.objectId, 0);
627  }
628  }
629 }
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
Oid GetUserId(void)
Definition: miscinit.c:380
Definition: nodes.h:525
Form_pg_class rd_rel
Definition: rel.h:83
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3353
struct RelationData * Relation
Definition: relcache.h:26
#define RelationGetRelationName(relation)
Definition: rel.h:453
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4755
const ObjectAddress InvalidObjectAddress
ObjectType get_relkind_objtype(char relkind)
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define RelationGetRelid(relation)
Definition: rel.h:419

◆ PublicationDropTables()

static void PublicationDropTables ( Oid  pubid,
List rels,
bool  missing_ok 
)
static

Definition at line 635 of file publicationcmds.c.

References DROP_CASCADE, ereport, errcode(), errmsg(), ERROR, GetSysCacheOid2, lfirst, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, performDeletion(), PUBLICATIONRELMAP, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables().

636 {
637  ObjectAddress obj;
638  ListCell *lc;
639  Oid prid;
640 
641  foreach(lc, rels)
642  {
643  Relation rel = (Relation) lfirst(lc);
644  Oid relid = RelationGetRelid(rel);
645 
646  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
647  ObjectIdGetDatum(relid),
648  ObjectIdGetDatum(pubid));
649  if (!OidIsValid(prid))
650  {
651  if (missing_ok)
652  continue;
653 
654  ereport(ERROR,
655  (errcode(ERRCODE_UNDEFINED_OBJECT),
656  errmsg("relation \"%s\" is not part of the publication",
657  RelationGetRelationName(rel))));
658  }
659 
660  ObjectAddressSet(obj, PublicationRelRelationId, prid);
661  performDeletion(&obj, DROP_CASCADE, 0);
662  }
663 }
int errcode(int sqlerrcode)
Definition: elog.c:570
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:638
struct RelationData * Relation
Definition: relcache.h:26
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:315
#define RelationGetRelationName(relation)
Definition: rel.h:453
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define RelationGetRelid(relation)
Definition: rel.h:419

◆ RemovePublicationById()

void RemovePublicationById ( Oid  pubid)

Definition at line 454 of file publicationcmds.c.

References CatalogTupleDelete(), elog, ERROR, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

455 {
456  Relation rel;
457  HeapTuple tup;
458 
459  rel = table_open(PublicationRelationId, RowExclusiveLock);
460 
462 
463  if (!HeapTupleIsValid(tup))
464  elog(ERROR, "cache lookup failed for publication %u", pubid);
465 
466  CatalogTupleDelete(rel, &tup->t_self);
467 
468  ReleaseSysCache(tup);
469 
471 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1124
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1172
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:226
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ RemovePublicationRelById()

void RemovePublicationRelById ( Oid  proid)

Definition at line 477 of file publicationcmds.c.

References CacheInvalidateRelcacheByRelid(), CatalogTupleDelete(), elog, ERROR, GETSTRUCT, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONREL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

478 {
479  Relation rel;
480  HeapTuple tup;
482 
483  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
484 
486 
487  if (!HeapTupleIsValid(tup))
488  elog(ERROR, "cache lookup failed for publication table %u",
489  proid);
490 
491  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
492 
493  /* Invalidate relcache so that publication info is rebuilt. */
494  CacheInvalidateRelcacheByRelid(pubrel->prrelid);
495 
496  CatalogTupleDelete(rel, &tup->t_self);
497 
498  ReleaseSysCache(tup);
499 
501 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1329
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1124
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1172
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:226
FormData_pg_publication_rel * Form_pg_publication_rel
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39