PostgreSQL Source Code  git master
publicationcmds.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for publicationcmds.c:

Go to the source code of this file.

Macros

#define MAX_RELCACHE_INVAL_MSGS   4096
 

Functions

static ListOpenTableList (List *tables)
 
static void CloseTableList (List *rels)
 
static void PublicationAddTables (Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
 
static void PublicationDropTables (Oid pubid, List *rels, bool missing_ok)
 
static void parse_publication_options (List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
 
ObjectAddress CreatePublication (CreatePublicationStmt *stmt)
 
static void AlterPublicationOptions (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
static void AlterPublicationTables (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
void AlterPublication (AlterPublicationStmt *stmt)
 
void RemovePublicationById (Oid pubid)
 
void RemovePublicationRelById (Oid proid)
 
static void AlterPublicationOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterPublicationOwner (const char *name, Oid newOwnerId)
 
void AlterPublicationOwner_oid (Oid subid, Oid newOwnerId)
 

Macro Definition Documentation

◆ MAX_RELCACHE_INVAL_MSGS

#define MAX_RELCACHE_INVAL_MSGS   4096

Definition at line 48 of file publicationcmds.c.

Referenced by AlterPublicationOptions().

Function Documentation

◆ AlterPublication()

void AlterPublication ( AlterPublicationStmt stmt)

Definition at line 420 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, AlterPublicationOptions(), AlterPublicationTables(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, GetUserId(), heap_freetuple(), HeapTupleIsValid, OBJECT_PUBLICATION, AlterPublicationStmt::options, pg_publication_ownercheck(), PUBLICATIONNAME, AlterPublicationStmt::pubname, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ProcessUtilitySlow().

421 {
422  Relation rel;
423  HeapTuple tup;
424  Form_pg_publication pubform;
425 
426  rel = table_open(PublicationRelationId, RowExclusiveLock);
427 
429  CStringGetDatum(stmt->pubname));
430 
431  if (!HeapTupleIsValid(tup))
432  ereport(ERROR,
433  (errcode(ERRCODE_UNDEFINED_OBJECT),
434  errmsg("publication \"%s\" does not exist",
435  stmt->pubname)));
436 
437  pubform = (Form_pg_publication) GETSTRUCT(tup);
438 
439  /* must be owner */
440  if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
442  stmt->pubname);
443 
444  if (stmt->options)
445  AlterPublicationOptions(stmt, rel, tup);
446  else
447  AlterPublicationTables(stmt, rel, tup);
448 
449  /* Cleanup. */
450  heap_freetuple(tup);
452 }
static void AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:439
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5291
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
static void AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOptions()

static void AlterPublicationOptions ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 247 of file publicationcmds.c.

References BoolGetDatum, CacheInvalidateRelcacheAll(), CacheInvalidateRelcacheByRelid(), CatalogTupleUpdate(), CommandCounterIncrement(), EventTriggerCollectSimpleCommand(), GetPublicationRelations(), GETSTRUCT, heap_modify_tuple(), InvalidObjectAddress, InvokeObjectPostAlterHook, lfirst_oid, list_length(), MAX_RELCACHE_INVAL_MSGS, ObjectAddressSet, AlterPublicationStmt::options, parse_publication_options(), PUBLICATION_PART_ALL, RelationGetDescr, HeapTupleData::t_self, and values.

Referenced by AlterPublication().

249 {
250  bool nulls[Natts_pg_publication];
251  bool replaces[Natts_pg_publication];
252  Datum values[Natts_pg_publication];
253  bool publish_given;
254  bool publish_insert;
255  bool publish_update;
256  bool publish_delete;
257  bool publish_truncate;
258  ObjectAddress obj;
259  Form_pg_publication pubform;
260 
262  &publish_given, &publish_insert,
263  &publish_update, &publish_delete,
264  &publish_truncate);
265 
266  /* Everything ok, form a new tuple. */
267  memset(values, 0, sizeof(values));
268  memset(nulls, false, sizeof(nulls));
269  memset(replaces, false, sizeof(replaces));
270 
271  if (publish_given)
272  {
273  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
274  replaces[Anum_pg_publication_pubinsert - 1] = true;
275 
276  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
277  replaces[Anum_pg_publication_pubupdate - 1] = true;
278 
279  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
280  replaces[Anum_pg_publication_pubdelete - 1] = true;
281 
282  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
283  replaces[Anum_pg_publication_pubtruncate - 1] = true;
284  }
285 
286  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
287  replaces);
288 
289  /* Update the catalog. */
290  CatalogTupleUpdate(rel, &tup->t_self, tup);
291 
293 
294  pubform = (Form_pg_publication) GETSTRUCT(tup);
295 
296  /* Invalidate the relcache. */
297  if (pubform->puballtables)
298  {
300  }
301  else
302  {
303  /*
304  * For any partitioned tables contained in the publication, we must
305  * invalidate all partitions contained in the respective partition
306  * trees, not just those explicitly mentioned in the publication.
307  */
308  List *relids = GetPublicationRelations(pubform->oid,
310 
311  /*
312  * We don't want to send too many individual messages, at some point
313  * it's cheaper to just reset whole relcache.
314  */
315  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
316  {
317  ListCell *lc;
318 
319  foreach(lc, relids)
320  {
321  Oid relid = lfirst_oid(lc);
322 
324  }
325  }
326  else
328  }
329 
330  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
332  (Node *) stmt);
333 
334  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
335 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
#define RelationGetDescr(relation)
Definition: rel.h:461
Definition: nodes.h:526
unsigned int Oid
Definition: postgres_ext.h:31
#define MAX_RELCACHE_INVAL_MSGS
ItemPointerData t_self
Definition: htup.h:65
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1329
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1294
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define BoolGetDatum(X)
Definition: postgres.h:402
static void parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
const ObjectAddress InvalidObjectAddress
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
FormData_pg_publication * Form_pg_publication
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
Definition: pg_list.h:50
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ AlterPublicationOwner()

ObjectAddress AlterPublicationOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 730 of file publicationcmds.c.

References AlterPublicationOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, ObjectAddressSet, PUBLICATIONNAME, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

731 {
732  Oid subid;
733  HeapTuple tup;
734  Relation rel;
735  ObjectAddress address;
736  Form_pg_publication pubform;
737 
738  rel = table_open(PublicationRelationId, RowExclusiveLock);
739 
741 
742  if (!HeapTupleIsValid(tup))
743  ereport(ERROR,
744  (errcode(ERRCODE_UNDEFINED_OBJECT),
745  errmsg("publication \"%s\" does not exist", name)));
746 
747  pubform = (Form_pg_publication) GETSTRUCT(tup);
748  subid = pubform->oid;
749 
750  AlterPublicationOwner_internal(rel, tup, newOwnerId);
751 
752  ObjectAddressSet(address, PublicationRelationId, subid);
753 
754  heap_freetuple(tup);
755 
757 
758  return address;
759 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOwner_internal()

static void AlterPublicationOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 679 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, CatalogTupleUpdate(), changeDependencyOnOwner(), check_is_member_of_role(), ereport, errcode(), errhint(), errmsg(), ERROR, get_database_name(), GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, MyDatabaseId, NameStr, OBJECT_DATABASE, OBJECT_PUBLICATION, pg_database_aclcheck(), pg_publication_ownercheck(), superuser(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterPublicationOwner(), and AlterPublicationOwner_oid().

680 {
681  Form_pg_publication form;
682 
683  form = (Form_pg_publication) GETSTRUCT(tup);
684 
685  if (form->pubowner == newOwnerId)
686  return;
687 
688  if (!superuser())
689  {
690  AclResult aclresult;
691 
692  /* Must be owner */
693  if (!pg_publication_ownercheck(form->oid, GetUserId()))
695  NameStr(form->pubname));
696 
697  /* Must be able to become new owner */
698  check_is_member_of_role(GetUserId(), newOwnerId);
699 
700  /* New owner must have CREATE privilege on database */
701  aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
702  if (aclresult != ACLCHECK_OK)
703  aclcheck_error(aclresult, OBJECT_DATABASE,
705 
706  if (form->puballtables && !superuser_arg(newOwnerId))
707  ereport(ERROR,
708  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
709  errmsg("permission denied to change owner of publication \"%s\"",
710  NameStr(form->pubname)),
711  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
712  }
713 
714  form->pubowner = newOwnerId;
715  CatalogTupleUpdate(rel, &tup->t_self, tup);
716 
717  /* Update owner dependency reference */
718  changeDependencyOnOwner(PublicationRelationId,
719  form->oid,
720  newOwnerId);
721 
722  InvokeObjectPostAlterHook(PublicationRelationId,
723  form->oid, 0);
724 }
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:439
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5291
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:309
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ERROR
Definition: elog.h:43
#define ACL_CREATE
Definition: parsenodes.h:84
ItemPointerData t_self
Definition: htup.h:65
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
void check_is_member_of_role(Oid member, Oid role)
Definition: acl.c:4946
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
AclResult
Definition: acl.h:177
Oid MyDatabaseId
Definition: globals.c:85
#define ereport(elevel,...)
Definition: elog.h:144
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4641
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define NameStr(name)
Definition: c.h:615
FormData_pg_publication * Form_pg_publication

◆ AlterPublicationOwner_oid()

void AlterPublicationOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 765 of file publicationcmds.c.

References AlterPublicationOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned().

766 {
767  HeapTuple tup;
768  Relation rel;
769 
770  rel = table_open(PublicationRelationId, RowExclusiveLock);
771 
773 
774  if (!HeapTupleIsValid(tup))
775  ereport(ERROR,
776  (errcode(ERRCODE_UNDEFINED_OBJECT),
777  errmsg("publication with OID %u does not exist", subid)));
778 
779  AlterPublicationOwner_internal(rel, tup, newOwnerId);
780 
781  heap_freetuple(tup);
782 
784 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationTables()

static void AlterPublicationTables ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 341 of file publicationcmds.c.

References Assert, CloseTableList(), DEFELEM_ADD, DEFELEM_DROP, ereport, errcode(), errdetail(), errmsg(), ERROR, GetPublicationRelations(), GETSTRUCT, lappend(), lfirst, lfirst_oid, list_length(), NameStr, NIL, OpenTableList(), PUBLICATION_PART_ROOT, PublicationAddTables(), PublicationDropTables(), RelationGetRelid, ShareUpdateExclusiveLock, table_open(), AlterPublicationStmt::tableAction, and AlterPublicationStmt::tables.

Referenced by AlterPublication().

343 {
344  List *rels = NIL;
346  Oid pubid = pubform->oid;
347 
348  /* Check that user is allowed to manipulate the publication tables. */
349  if (pubform->puballtables)
350  ereport(ERROR,
351  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
352  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
353  NameStr(pubform->pubname)),
354  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
355 
356  Assert(list_length(stmt->tables) > 0);
357 
358  rels = OpenTableList(stmt->tables);
359 
360  if (stmt->tableAction == DEFELEM_ADD)
361  PublicationAddTables(pubid, rels, false, stmt);
362  else if (stmt->tableAction == DEFELEM_DROP)
363  PublicationDropTables(pubid, rels, false);
364  else /* DEFELEM_SET */
365  {
366  List *oldrelids = GetPublicationRelations(pubid,
368  List *delrels = NIL;
369  ListCell *oldlc;
370 
371  /* Calculate which relations to drop. */
372  foreach(oldlc, oldrelids)
373  {
374  Oid oldrelid = lfirst_oid(oldlc);
375  ListCell *newlc;
376  bool found = false;
377 
378  foreach(newlc, rels)
379  {
380  Relation newrel = (Relation) lfirst(newlc);
381 
382  if (RelationGetRelid(newrel) == oldrelid)
383  {
384  found = true;
385  break;
386  }
387  }
388 
389  if (!found)
390  {
391  Relation oldrel = table_open(oldrelid,
393 
394  delrels = lappend(delrels, oldrel);
395  }
396  }
397 
398  /* And drop them. */
399  PublicationDropTables(pubid, delrels, true);
400 
401  /*
402  * Don't bother calculating the difference for adding, we'll catch and
403  * skip existing ones when doing catalog update.
404  */
405  PublicationAddTables(pubid, rels, true, stmt);
406 
407  CloseTableList(delrels);
408  }
409 
410  CloseTableList(rels);
411 }
#define NIL
Definition: pg_list.h:65
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
int errcode(int sqlerrcode)
Definition: elog.c:610
unsigned int Oid
Definition: postgres_ext.h:31
static void CloseTableList(List *rels)
struct RelationData * Relation
Definition: relcache.h:26
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:957
List * lappend(List *list, void *datum)
Definition: list.c:322
DefElemAction tableAction
Definition: parsenodes.h:3527
#define ereport(elevel,...)
Definition: elog.h:144
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
static int list_length(const List *l)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define NameStr(name)
Definition: c.h:615
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:435
static List * OpenTableList(List *tables)
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ CloseTableList()

static void CloseTableList ( List rels)
static

Definition at line 596 of file publicationcmds.c.

References lfirst, NoLock, and table_close().

Referenced by AlterPublicationTables(), and CreatePublication().

597 {
598  ListCell *lc;
599 
600  foreach(lc, rels)
601  {
602  Relation rel = (Relation) lfirst(lc);
603 
604  table_close(rel, NoLock);
605  }
606 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
struct RelationData * Relation
Definition: relcache.h:26
#define NoLock
Definition: lockdefs.h:34
#define lfirst(lc)
Definition: pg_list.h:190

◆ CreatePublication()

ObjectAddress CreatePublication ( CreatePublicationStmt stmt)

Definition at line 137 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_OK, Assert, BoolGetDatum, CatalogTupleInsert(), CloseTableList(), CommandCounterIncrement(), CStringGetDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, CreatePublicationStmt::for_all_tables, get_database_name(), GetNewOidWithIndex(), GetSysCacheOid1, GetUserId(), heap_form_tuple(), heap_freetuple(), InvokeObjectPostCreateHook, list_length(), MyDatabaseId, namein(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, OpenTableList(), CreatePublicationStmt::options, parse_publication_options(), pg_database_aclcheck(), PublicationAddTables(), PUBLICATIONNAME, PublicationObjectIndexId, CreatePublicationStmt::pubname, recordDependencyOnOwner(), RelationGetDescr, RowExclusiveLock, superuser(), table_close(), table_open(), CreatePublicationStmt::tables, values, wal_level, WAL_LEVEL_LOGICAL, and WARNING.

Referenced by ProcessUtilitySlow().

138 {
139  Relation rel;
140  ObjectAddress myself;
141  Oid puboid;
142  bool nulls[Natts_pg_publication];
143  Datum values[Natts_pg_publication];
144  HeapTuple tup;
145  bool publish_given;
146  bool publish_insert;
147  bool publish_update;
148  bool publish_delete;
149  bool publish_truncate;
150  AclResult aclresult;
151 
152  /* must have CREATE privilege on database */
154  if (aclresult != ACLCHECK_OK)
155  aclcheck_error(aclresult, OBJECT_DATABASE,
157 
158  /* FOR ALL TABLES requires superuser */
159  if (stmt->for_all_tables && !superuser())
160  ereport(ERROR,
161  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
162  errmsg("must be superuser to create FOR ALL TABLES publication")));
163 
164  rel = table_open(PublicationRelationId, RowExclusiveLock);
165 
166  /* Check if name is used */
167  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
168  CStringGetDatum(stmt->pubname));
169  if (OidIsValid(puboid))
170  {
171  ereport(ERROR,
173  errmsg("publication \"%s\" already exists",
174  stmt->pubname)));
175  }
176 
177  /* Form a tuple. */
178  memset(values, 0, sizeof(values));
179  memset(nulls, false, sizeof(nulls));
180 
181  values[Anum_pg_publication_pubname - 1] =
183  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
184 
186  &publish_given, &publish_insert,
187  &publish_update, &publish_delete,
188  &publish_truncate);
189 
191  Anum_pg_publication_oid);
192  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
193  values[Anum_pg_publication_puballtables - 1] =
195  values[Anum_pg_publication_pubinsert - 1] =
196  BoolGetDatum(publish_insert);
197  values[Anum_pg_publication_pubupdate - 1] =
198  BoolGetDatum(publish_update);
199  values[Anum_pg_publication_pubdelete - 1] =
200  BoolGetDatum(publish_delete);
201  values[Anum_pg_publication_pubtruncate - 1] =
202  BoolGetDatum(publish_truncate);
203 
204  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
205 
206  /* Insert tuple into catalog. */
207  CatalogTupleInsert(rel, tup);
208  heap_freetuple(tup);
209 
210  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
211 
212  ObjectAddressSet(myself, PublicationRelationId, puboid);
213 
214  /* Make the changes visible. */
216 
217  if (stmt->tables)
218  {
219  List *rels;
220 
221  Assert(list_length(stmt->tables) > 0);
222 
223  rels = OpenTableList(stmt->tables);
224  PublicationAddTables(puboid, rels, true, NULL);
225  CloseTableList(rels);
226  }
227 
229 
230  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
231 
233  {
235  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
236  errmsg("wal_level is insufficient to publish logical changes"),
237  errhint("Set wal_level to logical before creating subscriptions.")));
238  }
239 
240  return myself;
241 }
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:317
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:461
Oid GetUserId(void)
Definition: miscinit.c:439
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:192
int wal_level
Definition: xlog.c:104
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:615
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:644
static void CloseTableList(List *rels)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PublicationObjectIndexId
Definition: indexing.h:346
#define ACL_CREATE
Definition: parsenodes.h:84
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define WARNING
Definition: elog.h:40
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1005
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4641
#define Assert(condition)
Definition: c.h:738
static void parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:824
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:50
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
static List * OpenTableList(List *tables)

◆ OpenTableList()

static List * OpenTableList ( List tables)
static

Definition at line 513 of file publicationcmds.c.

References castNode, CHECK_FOR_INTERRUPTS, find_all_inheritors(), RangeVar::inh, lappend(), lappend_oid(), lfirst, lfirst_oid, list_free(), list_member_oid(), NIL, NoLock, RelationData::rd_rel, RelationGetRelid, ShareUpdateExclusiveLock, table_close(), table_open(), and table_openrv().

Referenced by AlterPublicationTables(), and CreatePublication().

514 {
515  List *relids = NIL;
516  List *rels = NIL;
517  ListCell *lc;
518 
519  /*
520  * Open, share-lock, and check all the explicitly-specified relations
521  */
522  foreach(lc, tables)
523  {
524  RangeVar *rv = castNode(RangeVar, lfirst(lc));
525  bool recurse = rv->inh;
526  Relation rel;
527  Oid myrelid;
528 
529  /* Allow query cancel in case this takes a long time */
531 
533  myrelid = RelationGetRelid(rel);
534 
535  /*
536  * Filter out duplicates if user specifies "foo, foo".
537  *
538  * Note that this algorithm is known to not be very efficient (O(N^2))
539  * but given that it only works on list of tables given to us by user
540  * it's deemed acceptable.
541  */
542  if (list_member_oid(relids, myrelid))
543  {
545  continue;
546  }
547 
548  rels = lappend(rels, rel);
549  relids = lappend_oid(relids, myrelid);
550 
551  /*
552  * Add children of this rel, if requested, so that they too are added
553  * to the publication. A partitioned table can't have any inheritance
554  * children other than its partitions, which need not be explicitly
555  * added to the publication.
556  */
557  if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
558  {
559  List *children;
560  ListCell *child;
561 
562  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
563  NULL);
564 
565  foreach(child, children)
566  {
567  Oid childrelid = lfirst_oid(child);
568 
569  /* Allow query cancel in case this takes a long time */
571 
572  /*
573  * Skip duplicates if user specified both parent and child
574  * tables.
575  */
576  if (list_member_oid(relids, childrelid))
577  continue;
578 
579  /* find_all_inheritors already got lock */
580  rel = table_open(childrelid, NoLock);
581  rels = lappend(rels, rel);
582  relids = lappend_oid(relids, childrelid);
583  }
584  }
585  }
586 
587  list_free(relids);
588 
589  return rels;
590 }
#define NIL
Definition: pg_list.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define castNode(_type_, nodeptr)
Definition: nodes.h:595
Form_pg_class rd_rel
Definition: rel.h:89
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
#define NoLock
Definition: lockdefs.h:34
bool inh
Definition: primnodes.h:69
List * lappend(List *list, void *datum)
Definition: list.c:322
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:68
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:675
#define lfirst(lc)
Definition: pg_list.h:190
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
void list_free(List *list)
Definition: list.c:1377
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:435
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ parse_publication_options()

static void parse_publication_options ( List options,
bool publish_given,
bool publish_insert,
bool publish_update,
bool publish_delete,
bool publish_truncate 
)
static

Definition at line 57 of file publicationcmds.c.

References defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, lfirst, and SplitIdentifierString().

Referenced by AlterPublicationOptions(), and CreatePublication().

63 {
64  ListCell *lc;
65 
66  *publish_given = false;
67 
68  /* Defaults are true */
69  *publish_insert = true;
70  *publish_update = true;
71  *publish_delete = true;
72  *publish_truncate = true;
73 
74  /* Parse options */
75  foreach(lc, options)
76  {
77  DefElem *defel = (DefElem *) lfirst(lc);
78 
79  if (strcmp(defel->defname, "publish") == 0)
80  {
81  char *publish;
82  List *publish_list;
83  ListCell *lc;
84 
85  if (*publish_given)
86  ereport(ERROR,
87  (errcode(ERRCODE_SYNTAX_ERROR),
88  errmsg("conflicting or redundant options")));
89 
90  /*
91  * If publish option was given only the explicitly listed actions
92  * should be published.
93  */
94  *publish_insert = false;
95  *publish_update = false;
96  *publish_delete = false;
97  *publish_truncate = false;
98 
99  *publish_given = true;
100  publish = defGetString(defel);
101 
102  if (!SplitIdentifierString(publish, ',', &publish_list))
103  ereport(ERROR,
104  (errcode(ERRCODE_SYNTAX_ERROR),
105  errmsg("invalid list syntax for \"publish\" option")));
106 
107  /* Process the option list. */
108  foreach(lc, publish_list)
109  {
110  char *publish_opt = (char *) lfirst(lc);
111 
112  if (strcmp(publish_opt, "insert") == 0)
113  *publish_insert = true;
114  else if (strcmp(publish_opt, "update") == 0)
115  *publish_update = true;
116  else if (strcmp(publish_opt, "delete") == 0)
117  *publish_delete = true;
118  else if (strcmp(publish_opt, "truncate") == 0)
119  *publish_truncate = true;
120  else
121  ereport(ERROR,
122  (errcode(ERRCODE_SYNTAX_ERROR),
123  errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
124  }
125  }
126  else
127  ereport(ERROR,
128  (errcode(ERRCODE_SYNTAX_ERROR),
129  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
130  }
131 }
int errcode(int sqlerrcode)
Definition: elog.c:610
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3672
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:824
char * defname
Definition: parsenodes.h:730
Definition: pg_list.h:50

◆ PublicationAddTables()

static void PublicationAddTables ( Oid  pubid,
List rels,
bool  if_not_exists,
AlterPublicationStmt stmt 
)
static

Definition at line 612 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, Assert, EventTriggerCollectSimpleCommand(), AlterPublicationStmt::for_all_tables, get_relkind_objtype(), GetUserId(), InvalidObjectAddress, InvokeObjectPostCreateHook, lfirst, ObjectAddress::objectId, pg_class_ownercheck(), publication_add_relation(), RelationData::rd_rel, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables(), and CreatePublication().

614 {
615  ListCell *lc;
616 
617  Assert(!stmt || !stmt->for_all_tables);
618 
619  foreach(lc, rels)
620  {
621  Relation rel = (Relation) lfirst(lc);
622  ObjectAddress obj;
623 
624  /* Must be owner of the table or superuser. */
628 
629  obj = publication_add_relation(pubid, rel, if_not_exists);
630  if (stmt)
631  {
633  (Node *) stmt);
634 
635  InvokeObjectPostCreateHook(PublicationRelRelationId,
636  obj.objectId, 0);
637  }
638  }
639 }
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
Oid GetUserId(void)
Definition: miscinit.c:439
Definition: nodes.h:526
Form_pg_class rd_rel
Definition: rel.h:89
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
struct RelationData * Relation
Definition: relcache.h:26
#define RelationGetRelationName(relation)
Definition: rel.h:469
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4753
const ObjectAddress InvalidObjectAddress
ObjectType get_relkind_objtype(char relkind)
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define RelationGetRelid(relation)
Definition: rel.h:435

◆ PublicationDropTables()

static void PublicationDropTables ( Oid  pubid,
List rels,
bool  missing_ok 
)
static

Definition at line 645 of file publicationcmds.c.

References DROP_CASCADE, ereport, errcode(), errmsg(), ERROR, GetSysCacheOid2, lfirst, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, performDeletion(), PUBLICATIONRELMAP, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables().

646 {
647  ObjectAddress obj;
648  ListCell *lc;
649  Oid prid;
650 
651  foreach(lc, rels)
652  {
653  Relation rel = (Relation) lfirst(lc);
654  Oid relid = RelationGetRelid(rel);
655 
656  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
657  ObjectIdGetDatum(relid),
658  ObjectIdGetDatum(pubid));
659  if (!OidIsValid(prid))
660  {
661  if (missing_ok)
662  continue;
663 
664  ereport(ERROR,
665  (errcode(ERRCODE_UNDEFINED_OBJECT),
666  errmsg("relation \"%s\" is not part of the publication",
667  RelationGetRelationName(rel))));
668  }
669 
670  ObjectAddressSet(obj, PublicationRelRelationId, prid);
671  performDeletion(&obj, DROP_CASCADE, 0);
672  }
673 }
int errcode(int sqlerrcode)
Definition: elog.c:610
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:644
struct RelationData * Relation
Definition: relcache.h:26
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:316
#define RelationGetRelationName(relation)
Definition: rel.h:469
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define RelationGetRelid(relation)
Definition: rel.h:435

◆ RemovePublicationById()

void RemovePublicationById ( Oid  pubid)

Definition at line 458 of file publicationcmds.c.

References CatalogTupleDelete(), elog, ERROR, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

459 {
460  Relation rel;
461  HeapTuple tup;
462 
463  rel = table_open(PublicationRelationId, RowExclusiveLock);
464 
466 
467  if (!HeapTupleIsValid(tup))
468  elog(ERROR, "cache lookup failed for publication %u", pubid);
469 
470  CatalogTupleDelete(rel, &tup->t_self);
471 
472  ReleaseSysCache(tup);
473 
475 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:214
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ RemovePublicationRelById()

void RemovePublicationRelById ( Oid  proid)

Definition at line 481 of file publicationcmds.c.

References CacheInvalidateRelcacheByRelid(), CatalogTupleDelete(), elog, ERROR, GETSTRUCT, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONREL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

482 {
483  Relation rel;
484  HeapTuple tup;
486 
487  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
488 
490 
491  if (!HeapTupleIsValid(tup))
492  elog(ERROR, "cache lookup failed for publication table %u",
493  proid);
494 
495  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
496 
497  /* Invalidate relcache so that publication info is rebuilt. */
498  CacheInvalidateRelcacheByRelid(pubrel->prrelid);
499 
500  CatalogTupleDelete(rel, &tup->t_self);
501 
502  ReleaseSysCache(tup);
503 
505 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1329
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:214
FormData_pg_publication_rel * Form_pg_publication_rel
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39