PostgreSQL Source Code  git master
publicationcmds.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for publicationcmds.c:

Go to the source code of this file.

Macros

#define MAX_RELCACHE_INVAL_MSGS   4096
 

Functions

static ListOpenTableList (List *tables)
 
static void CloseTableList (List *rels)
 
static void PublicationAddTables (Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
 
static void PublicationDropTables (Oid pubid, List *rels, bool missing_ok)
 
static void parse_publication_options (List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
 
ObjectAddress CreatePublication (CreatePublicationStmt *stmt)
 
static void AlterPublicationOptions (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
static void AlterPublicationTables (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
void AlterPublication (AlterPublicationStmt *stmt)
 
void RemovePublicationById (Oid pubid)
 
void RemovePublicationRelById (Oid proid)
 
static void AlterPublicationOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterPublicationOwner (const char *name, Oid newOwnerId)
 
void AlterPublicationOwner_oid (Oid subid, Oid newOwnerId)
 

Macro Definition Documentation

◆ MAX_RELCACHE_INVAL_MSGS

#define MAX_RELCACHE_INVAL_MSGS   4096

Definition at line 47 of file publicationcmds.c.

Referenced by AlterPublicationOptions().

Function Documentation

◆ AlterPublication()

void AlterPublication ( AlterPublicationStmt stmt)

Definition at line 412 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, AlterPublicationOptions(), AlterPublicationTables(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, GetUserId(), heap_freetuple(), HeapTupleIsValid, OBJECT_PUBLICATION, AlterPublicationStmt::options, pg_publication_ownercheck(), PUBLICATIONNAME, AlterPublicationStmt::pubname, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ProcessUtilitySlow().

413 {
414  Relation rel;
415  HeapTuple tup;
416  Form_pg_publication pubform;
417 
418  rel = table_open(PublicationRelationId, RowExclusiveLock);
419 
421  CStringGetDatum(stmt->pubname));
422 
423  if (!HeapTupleIsValid(tup))
424  ereport(ERROR,
425  (errcode(ERRCODE_UNDEFINED_OBJECT),
426  errmsg("publication \"%s\" does not exist",
427  stmt->pubname)));
428 
429  pubform = (Form_pg_publication) GETSTRUCT(tup);
430 
431  /* must be owner */
432  if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
434  stmt->pubname);
435 
436  if (stmt->options)
437  AlterPublicationOptions(stmt, rel, tup);
438  else
439  AlterPublicationTables(stmt, rel, tup);
440 
441  /* Cleanup. */
442  heap_freetuple(tup);
444 }
static void AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:380
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5291
int errcode(int sqlerrcode)
Definition: elog.c:608
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:822
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOptions()

static void AlterPublicationOptions ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 246 of file publicationcmds.c.

References BoolGetDatum, CacheInvalidateRelcacheAll(), CacheInvalidateRelcacheByRelid(), CatalogTupleUpdate(), CommandCounterIncrement(), EventTriggerCollectSimpleCommand(), GetPublicationRelations(), GETSTRUCT, heap_modify_tuple(), InvalidObjectAddress, InvokeObjectPostAlterHook, lfirst_oid, list_length(), MAX_RELCACHE_INVAL_MSGS, ObjectAddressSet, AlterPublicationStmt::options, parse_publication_options(), RelationGetDescr, HeapTupleData::t_self, and values.

Referenced by AlterPublication().

248 {
249  bool nulls[Natts_pg_publication];
250  bool replaces[Natts_pg_publication];
251  Datum values[Natts_pg_publication];
252  bool publish_given;
253  bool publish_insert;
254  bool publish_update;
255  bool publish_delete;
256  bool publish_truncate;
257  ObjectAddress obj;
258  Form_pg_publication pubform;
259 
261  &publish_given, &publish_insert,
262  &publish_update, &publish_delete,
263  &publish_truncate);
264 
265  /* Everything ok, form a new tuple. */
266  memset(values, 0, sizeof(values));
267  memset(nulls, false, sizeof(nulls));
268  memset(replaces, false, sizeof(replaces));
269 
270  if (publish_given)
271  {
272  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
273  replaces[Anum_pg_publication_pubinsert - 1] = true;
274 
275  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
276  replaces[Anum_pg_publication_pubupdate - 1] = true;
277 
278  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
279  replaces[Anum_pg_publication_pubdelete - 1] = true;
280 
281  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
282  replaces[Anum_pg_publication_pubtruncate - 1] = true;
283  }
284 
285  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
286  replaces);
287 
288  /* Update the catalog. */
289  CatalogTupleUpdate(rel, &tup->t_self, tup);
290 
292 
293  pubform = (Form_pg_publication) GETSTRUCT(tup);
294 
295  /* Invalidate the relcache. */
296  if (pubform->puballtables)
297  {
299  }
300  else
301  {
302  List *relids = GetPublicationRelations(pubform->oid);
303 
304  /*
305  * We don't want to send too many individual messages, at some point
306  * it's cheaper to just reset whole relcache.
307  */
308  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
309  {
310  ListCell *lc;
311 
312  foreach(lc, relids)
313  {
314  Oid relid = lfirst_oid(lc);
315 
317  }
318  }
319  else
321  }
322 
323  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
325  (Node *) stmt);
326 
327  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
328 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:448
Definition: nodes.h:525
unsigned int Oid
Definition: postgres_ext.h:31
#define MAX_RELCACHE_INVAL_MSGS
ItemPointerData t_self
Definition: htup.h:65
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1329
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1294
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1005
#define BoolGetDatum(X)
Definition: postgres.h:402
List * GetPublicationRelations(Oid pubid)
static void parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
const ObjectAddress InvalidObjectAddress
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
FormData_pg_publication * Form_pg_publication
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
Definition: pg_list.h:50
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ AlterPublicationOwner()

ObjectAddress AlterPublicationOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 716 of file publicationcmds.c.

References AlterPublicationOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, ObjectAddressSet, PUBLICATIONNAME, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

717 {
718  Oid subid;
719  HeapTuple tup;
720  Relation rel;
721  ObjectAddress address;
722  Form_pg_publication pubform;
723 
724  rel = table_open(PublicationRelationId, RowExclusiveLock);
725 
727 
728  if (!HeapTupleIsValid(tup))
729  ereport(ERROR,
730  (errcode(ERRCODE_UNDEFINED_OBJECT),
731  errmsg("publication \"%s\" does not exist", name)));
732 
733  pubform = (Form_pg_publication) GETSTRUCT(tup);
734  subid = pubform->oid;
735 
736  AlterPublicationOwner_internal(rel, tup, newOwnerId);
737 
738  ObjectAddressSet(address, PublicationRelationId, subid);
739 
740  heap_freetuple(tup);
741 
743 
744  return address;
745 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:608
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:822
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOwner_internal()

static void AlterPublicationOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 665 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, CatalogTupleUpdate(), changeDependencyOnOwner(), check_is_member_of_role(), ereport, errcode(), errhint(), errmsg(), ERROR, get_database_name(), GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, MyDatabaseId, NameStr, OBJECT_DATABASE, OBJECT_PUBLICATION, pg_database_aclcheck(), pg_publication_ownercheck(), superuser(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterPublicationOwner(), and AlterPublicationOwner_oid().

666 {
667  Form_pg_publication form;
668 
669  form = (Form_pg_publication) GETSTRUCT(tup);
670 
671  if (form->pubowner == newOwnerId)
672  return;
673 
674  if (!superuser())
675  {
676  AclResult aclresult;
677 
678  /* Must be owner */
679  if (!pg_publication_ownercheck(form->oid, GetUserId()))
681  NameStr(form->pubname));
682 
683  /* Must be able to become new owner */
684  check_is_member_of_role(GetUserId(), newOwnerId);
685 
686  /* New owner must have CREATE privilege on database */
687  aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
688  if (aclresult != ACLCHECK_OK)
689  aclcheck_error(aclresult, OBJECT_DATABASE,
691 
692  if (form->puballtables && !superuser_arg(newOwnerId))
693  ereport(ERROR,
694  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
695  errmsg("permission denied to change owner of publication \"%s\"",
696  NameStr(form->pubname)),
697  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
698  }
699 
700  form->pubowner = newOwnerId;
701  CatalogTupleUpdate(rel, &tup->t_self, tup);
702 
703  /* Update owner dependency reference */
704  changeDependencyOnOwner(PublicationRelationId,
705  form->oid,
706  newOwnerId);
707 
708  InvokeObjectPostAlterHook(PublicationRelationId,
709  form->oid, 0);
710 }
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:380
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5291
int errcode(int sqlerrcode)
Definition: elog.c:608
bool superuser(void)
Definition: superuser.c:46
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:309
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ERROR
Definition: elog.h:43
#define ACL_CREATE
Definition: parsenodes.h:84
ItemPointerData t_self
Definition: htup.h:65
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
void check_is_member_of_role(Oid member, Oid role)
Definition: acl.c:4946
#define ereport(elevel, rest)
Definition: elog.h:141
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
AclResult
Definition: acl.h:177
Oid MyDatabaseId
Definition: globals.c:85
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4641
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define NameStr(name)
Definition: c.h:616
FormData_pg_publication * Form_pg_publication

◆ AlterPublicationOwner_oid()

void AlterPublicationOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 751 of file publicationcmds.c.

References AlterPublicationOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned().

752 {
753  HeapTuple tup;
754  Relation rel;
755 
756  rel = table_open(PublicationRelationId, RowExclusiveLock);
757 
759 
760  if (!HeapTupleIsValid(tup))
761  ereport(ERROR,
762  (errcode(ERRCODE_UNDEFINED_OBJECT),
763  errmsg("publication with OID %u does not exist", subid)));
764 
765  AlterPublicationOwner_internal(rel, tup, newOwnerId);
766 
767  heap_freetuple(tup);
768 
770 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errcode(int sqlerrcode)
Definition: elog.c:608
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:822
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationTables()

static void AlterPublicationTables ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 334 of file publicationcmds.c.

References Assert, CloseTableList(), DEFELEM_ADD, DEFELEM_DROP, ereport, errcode(), errdetail(), errmsg(), ERROR, GetPublicationRelations(), GETSTRUCT, lappend(), lfirst, lfirst_oid, list_length(), NameStr, NIL, OpenTableList(), PublicationAddTables(), PublicationDropTables(), RelationGetRelid, ShareUpdateExclusiveLock, table_open(), AlterPublicationStmt::tableAction, and AlterPublicationStmt::tables.

Referenced by AlterPublication().

336 {
337  List *rels = NIL;
339  Oid pubid = pubform->oid;
340 
341  /* Check that user is allowed to manipulate the publication tables. */
342  if (pubform->puballtables)
343  ereport(ERROR,
344  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
345  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
346  NameStr(pubform->pubname)),
347  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
348 
349  Assert(list_length(stmt->tables) > 0);
350 
351  rels = OpenTableList(stmt->tables);
352 
353  if (stmt->tableAction == DEFELEM_ADD)
354  PublicationAddTables(pubid, rels, false, stmt);
355  else if (stmt->tableAction == DEFELEM_DROP)
356  PublicationDropTables(pubid, rels, false);
357  else /* DEFELEM_SET */
358  {
359  List *oldrelids = GetPublicationRelations(pubid);
360  List *delrels = NIL;
361  ListCell *oldlc;
362 
363  /* Calculate which relations to drop. */
364  foreach(oldlc, oldrelids)
365  {
366  Oid oldrelid = lfirst_oid(oldlc);
367  ListCell *newlc;
368  bool found = false;
369 
370  foreach(newlc, rels)
371  {
372  Relation newrel = (Relation) lfirst(newlc);
373 
374  if (RelationGetRelid(newrel) == oldrelid)
375  {
376  found = true;
377  break;
378  }
379  }
380 
381  if (!found)
382  {
383  Relation oldrel = table_open(oldrelid,
385 
386  delrels = lappend(delrels, oldrel);
387  }
388  }
389 
390  /* And drop them. */
391  PublicationDropTables(pubid, delrels, true);
392 
393  /*
394  * Don't bother calculating the difference for adding, we'll catch and
395  * skip existing ones when doing catalog update.
396  */
397  PublicationAddTables(pubid, rels, true, stmt);
398 
399  CloseTableList(delrels);
400  }
401 
402  CloseTableList(rels);
403 }
#define NIL
Definition: pg_list.h:65
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
int errcode(int sqlerrcode)
Definition: elog.c:608
unsigned int Oid
Definition: postgres_ext.h:31
static void CloseTableList(List *rels)
struct RelationData * Relation
Definition: relcache.h:26
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:955
#define ereport(elevel, rest)
Definition: elog.h:141
List * lappend(List *list, void *datum)
Definition: list.c:322
DefElemAction tableAction
Definition: parsenodes.h:3498
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
List * GetPublicationRelations(Oid pubid)
static int list_length(const List *l)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define NameStr(name)
Definition: c.h:616
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:422
static List * OpenTableList(List *tables)
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ CloseTableList()

static void CloseTableList ( List rels)
static

Definition at line 582 of file publicationcmds.c.

References lfirst, NoLock, and table_close().

Referenced by AlterPublicationTables(), and CreatePublication().

583 {
584  ListCell *lc;
585 
586  foreach(lc, rels)
587  {
588  Relation rel = (Relation) lfirst(lc);
589 
590  table_close(rel, NoLock);
591  }
592 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
struct RelationData * Relation
Definition: relcache.h:26
#define NoLock
Definition: lockdefs.h:34
#define lfirst(lc)
Definition: pg_list.h:190

◆ CreatePublication()

ObjectAddress CreatePublication ( CreatePublicationStmt stmt)

Definition at line 136 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_OK, Assert, BoolGetDatum, CatalogTupleInsert(), CloseTableList(), CommandCounterIncrement(), CStringGetDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, CreatePublicationStmt::for_all_tables, get_database_name(), GetNewOidWithIndex(), GetSysCacheOid1, GetUserId(), heap_form_tuple(), heap_freetuple(), InvokeObjectPostCreateHook, list_length(), MyDatabaseId, namein(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, OpenTableList(), CreatePublicationStmt::options, parse_publication_options(), pg_database_aclcheck(), PublicationAddTables(), PUBLICATIONNAME, PublicationObjectIndexId, CreatePublicationStmt::pubname, recordDependencyOnOwner(), RelationGetDescr, RowExclusiveLock, superuser(), table_close(), table_open(), CreatePublicationStmt::tables, values, wal_level, WAL_LEVEL_LOGICAL, and WARNING.

Referenced by ProcessUtilitySlow().

137 {
138  Relation rel;
139  ObjectAddress myself;
140  Oid puboid;
141  bool nulls[Natts_pg_publication];
142  Datum values[Natts_pg_publication];
143  HeapTuple tup;
144  bool publish_given;
145  bool publish_insert;
146  bool publish_update;
147  bool publish_delete;
148  bool publish_truncate;
149  AclResult aclresult;
150 
151  /* must have CREATE privilege on database */
153  if (aclresult != ACLCHECK_OK)
154  aclcheck_error(aclresult, OBJECT_DATABASE,
156 
157  /* FOR ALL TABLES requires superuser */
158  if (stmt->for_all_tables && !superuser())
159  ereport(ERROR,
160  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
161  (errmsg("must be superuser to create FOR ALL TABLES publication"))));
162 
163  rel = table_open(PublicationRelationId, RowExclusiveLock);
164 
165  /* Check if name is used */
166  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
167  CStringGetDatum(stmt->pubname));
168  if (OidIsValid(puboid))
169  {
170  ereport(ERROR,
172  errmsg("publication \"%s\" already exists",
173  stmt->pubname)));
174  }
175 
176  /* Form a tuple. */
177  memset(values, 0, sizeof(values));
178  memset(nulls, false, sizeof(nulls));
179 
180  values[Anum_pg_publication_pubname - 1] =
182  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
183 
185  &publish_given, &publish_insert,
186  &publish_update, &publish_delete,
187  &publish_truncate);
188 
190  Anum_pg_publication_oid);
191  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
192  values[Anum_pg_publication_puballtables - 1] =
194  values[Anum_pg_publication_pubinsert - 1] =
195  BoolGetDatum(publish_insert);
196  values[Anum_pg_publication_pubupdate - 1] =
197  BoolGetDatum(publish_update);
198  values[Anum_pg_publication_pubdelete - 1] =
199  BoolGetDatum(publish_delete);
200  values[Anum_pg_publication_pubtruncate - 1] =
201  BoolGetDatum(publish_truncate);
202 
203  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
204 
205  /* Insert tuple into catalog. */
206  CatalogTupleInsert(rel, tup);
207  heap_freetuple(tup);
208 
209  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
210 
211  ObjectAddressSet(myself, PublicationRelationId, puboid);
212 
213  /* Make the changes visible. */
215 
216  if (stmt->tables)
217  {
218  List *rels;
219 
220  Assert(list_length(stmt->tables) > 0);
221 
222  rels = OpenTableList(stmt->tables);
223  PublicationAddTables(puboid, rels, true, NULL);
224  CloseTableList(rels);
225  }
226 
228 
229  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
230 
232  {
234  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
235  errmsg("wal_level is insufficient to publish logical changes"),
236  errhint("Set wal_level to logical before creating subscriptions.")));
237  }
238 
239  return myself;
240 }
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:322
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:448
Oid GetUserId(void)
Definition: miscinit.c:380
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:192
int wal_level
Definition: xlog.c:103
int errcode(int sqlerrcode)
Definition: elog.c:608
bool superuser(void)
Definition: superuser.c:46
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:615
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:645
static void CloseTableList(List *rels)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PublicationObjectIndexId
Definition: indexing.h:349
#define ACL_CREATE
Definition: parsenodes.h:84
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1005
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4641
#define Assert(condition)
Definition: c.h:739
static void parse_publication_options(List *options, bool *publish_given, bool *publish_insert, bool *publish_update, bool *publish_delete, bool *publish_truncate)
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:50
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
static List * OpenTableList(List *tables)

◆ OpenTableList()

static List * OpenTableList ( List tables)
static

Definition at line 504 of file publicationcmds.c.

References castNode, CHECK_FOR_INTERRUPTS, find_all_inheritors(), RangeVar::inh, lappend(), lappend_oid(), lfirst, lfirst_oid, list_free(), list_member_oid(), NIL, NoLock, RelationGetRelid, ShareUpdateExclusiveLock, table_close(), table_open(), and table_openrv().

Referenced by AlterPublicationTables(), and CreatePublication().

505 {
506  List *relids = NIL;
507  List *rels = NIL;
508  ListCell *lc;
509 
510  /*
511  * Open, share-lock, and check all the explicitly-specified relations
512  */
513  foreach(lc, tables)
514  {
515  RangeVar *rv = castNode(RangeVar, lfirst(lc));
516  bool recurse = rv->inh;
517  Relation rel;
518  Oid myrelid;
519 
520  /* Allow query cancel in case this takes a long time */
522 
524  myrelid = RelationGetRelid(rel);
525 
526  /*
527  * Filter out duplicates if user specifies "foo, foo".
528  *
529  * Note that this algorithm is known to not be very efficient (O(N^2))
530  * but given that it only works on list of tables given to us by user
531  * it's deemed acceptable.
532  */
533  if (list_member_oid(relids, myrelid))
534  {
536  continue;
537  }
538 
539  rels = lappend(rels, rel);
540  relids = lappend_oid(relids, myrelid);
541 
542  /* Add children of this rel, if requested */
543  if (recurse)
544  {
545  List *children;
546  ListCell *child;
547 
548  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
549  NULL);
550 
551  foreach(child, children)
552  {
553  Oid childrelid = lfirst_oid(child);
554 
555  /* Allow query cancel in case this takes a long time */
557 
558  /*
559  * Skip duplicates if user specified both parent and child
560  * tables.
561  */
562  if (list_member_oid(relids, childrelid))
563  continue;
564 
565  /* find_all_inheritors already got lock */
566  rel = table_open(childrelid, NoLock);
567  rels = lappend(rels, rel);
568  relids = lappend_oid(relids, childrelid);
569  }
570  }
571  }
572 
573  list_free(relids);
574 
575  return rels;
576 }
#define NIL
Definition: pg_list.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define castNode(_type_, nodeptr)
Definition: nodes.h:594
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
#define NoLock
Definition: lockdefs.h:34
bool inh
Definition: primnodes.h:69
List * lappend(List *list, void *datum)
Definition: list.c:322
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:68
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:675
#define lfirst(lc)
Definition: pg_list.h:190
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
void list_free(List *list)
Definition: list.c:1377
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:422
#define lfirst_oid(lc)
Definition: pg_list.h:192

◆ parse_publication_options()

static void parse_publication_options ( List options,
bool publish_given,
bool publish_insert,
bool publish_update,
bool publish_delete,
bool publish_truncate 
)
static

Definition at line 56 of file publicationcmds.c.

References defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, lfirst, and SplitIdentifierString().

Referenced by AlterPublicationOptions(), and CreatePublication().

62 {
63  ListCell *lc;
64 
65  *publish_given = false;
66 
67  /* Defaults are true */
68  *publish_insert = true;
69  *publish_update = true;
70  *publish_delete = true;
71  *publish_truncate = true;
72 
73  /* Parse options */
74  foreach(lc, options)
75  {
76  DefElem *defel = (DefElem *) lfirst(lc);
77 
78  if (strcmp(defel->defname, "publish") == 0)
79  {
80  char *publish;
81  List *publish_list;
82  ListCell *lc;
83 
84  if (*publish_given)
85  ereport(ERROR,
86  (errcode(ERRCODE_SYNTAX_ERROR),
87  errmsg("conflicting or redundant options")));
88 
89  /*
90  * If publish option was given only the explicitly listed actions
91  * should be published.
92  */
93  *publish_insert = false;
94  *publish_update = false;
95  *publish_delete = false;
96  *publish_truncate = false;
97 
98  *publish_given = true;
99  publish = defGetString(defel);
100 
101  if (!SplitIdentifierString(publish, ',', &publish_list))
102  ereport(ERROR,
103  (errcode(ERRCODE_SYNTAX_ERROR),
104  errmsg("invalid list syntax for \"publish\" option")));
105 
106  /* Process the option list. */
107  foreach(lc, publish_list)
108  {
109  char *publish_opt = (char *) lfirst(lc);
110 
111  if (strcmp(publish_opt, "insert") == 0)
112  *publish_insert = true;
113  else if (strcmp(publish_opt, "update") == 0)
114  *publish_update = true;
115  else if (strcmp(publish_opt, "delete") == 0)
116  *publish_delete = true;
117  else if (strcmp(publish_opt, "truncate") == 0)
118  *publish_truncate = true;
119  else
120  ereport(ERROR,
121  (errcode(ERRCODE_SYNTAX_ERROR),
122  errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
123  }
124  }
125  else
126  ereport(ERROR,
127  (errcode(ERRCODE_SYNTAX_ERROR),
128  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
129  }
130 }
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3652
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:822
char * defname
Definition: parsenodes.h:730
Definition: pg_list.h:50

◆ PublicationAddTables()

static void PublicationAddTables ( Oid  pubid,
List rels,
bool  if_not_exists,
AlterPublicationStmt stmt 
)
static

Definition at line 598 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, Assert, EventTriggerCollectSimpleCommand(), AlterPublicationStmt::for_all_tables, get_relkind_objtype(), GetUserId(), InvalidObjectAddress, InvokeObjectPostCreateHook, lfirst, ObjectAddress::objectId, pg_class_ownercheck(), publication_add_relation(), RelationData::rd_rel, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables(), and CreatePublication().

600 {
601  ListCell *lc;
602 
603  Assert(!stmt || !stmt->for_all_tables);
604 
605  foreach(lc, rels)
606  {
607  Relation rel = (Relation) lfirst(lc);
608  ObjectAddress obj;
609 
610  /* Must be owner of the table or superuser. */
614 
615  obj = publication_add_relation(pubid, rel, if_not_exists);
616  if (stmt)
617  {
619  (Node *) stmt);
620 
621  InvokeObjectPostCreateHook(PublicationRelRelationId,
622  obj.objectId, 0);
623  }
624  }
625 }
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
Oid GetUserId(void)
Definition: miscinit.c:380
Definition: nodes.h:525
Form_pg_class rd_rel
Definition: rel.h:83
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
struct RelationData * Relation
Definition: relcache.h:26
#define RelationGetRelationName(relation)
Definition: rel.h:456
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4753
const ObjectAddress InvalidObjectAddress
ObjectType get_relkind_objtype(char relkind)
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define RelationGetRelid(relation)
Definition: rel.h:422

◆ PublicationDropTables()

static void PublicationDropTables ( Oid  pubid,
List rels,
bool  missing_ok 
)
static

Definition at line 631 of file publicationcmds.c.

References DROP_CASCADE, ereport, errcode(), errmsg(), ERROR, GetSysCacheOid2, lfirst, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, performDeletion(), PUBLICATIONRELMAP, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables().

632 {
633  ObjectAddress obj;
634  ListCell *lc;
635  Oid prid;
636 
637  foreach(lc, rels)
638  {
639  Relation rel = (Relation) lfirst(lc);
640  Oid relid = RelationGetRelid(rel);
641 
642  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
643  ObjectIdGetDatum(relid),
644  ObjectIdGetDatum(pubid));
645  if (!OidIsValid(prid))
646  {
647  if (missing_ok)
648  continue;
649 
650  ereport(ERROR,
651  (errcode(ERRCODE_UNDEFINED_OBJECT),
652  errmsg("relation \"%s\" is not part of the publication",
653  RelationGetRelationName(rel))));
654  }
655 
656  ObjectAddressSet(obj, PublicationRelRelationId, prid);
657  performDeletion(&obj, DROP_CASCADE, 0);
658  }
659 }
int errcode(int sqlerrcode)
Definition: elog.c:608
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:645
struct RelationData * Relation
Definition: relcache.h:26
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:315
#define RelationGetRelationName(relation)
Definition: rel.h:456
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define RelationGetRelid(relation)
Definition: rel.h:422

◆ RemovePublicationById()

void RemovePublicationById ( Oid  pubid)

Definition at line 450 of file publicationcmds.c.

References CatalogTupleDelete(), elog, ERROR, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

451 {
452  Relation rel;
453  HeapTuple tup;
454 
455  rel = table_open(PublicationRelationId, RowExclusiveLock);
456 
458 
459  if (!HeapTupleIsValid(tup))
460  elog(ERROR, "cache lookup failed for publication %u", pubid);
461 
462  CatalogTupleDelete(rel, &tup->t_self);
463 
464  ReleaseSysCache(tup);
465 
467 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:228
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ RemovePublicationRelById()

void RemovePublicationRelById ( Oid  proid)

Definition at line 473 of file publicationcmds.c.

References CacheInvalidateRelcacheByRelid(), CatalogTupleDelete(), elog, ERROR, GETSTRUCT, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONREL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

474 {
475  Relation rel;
476  HeapTuple tup;
478 
479  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
480 
482 
483  if (!HeapTupleIsValid(tup))
484  elog(ERROR, "cache lookup failed for publication table %u",
485  proid);
486 
487  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
488 
489  /* Invalidate relcache so that publication info is rebuilt. */
490  CacheInvalidateRelcacheByRelid(pubrel->prrelid);
491 
492  CatalogTupleDelete(rel, &tup->t_self);
493 
494  ReleaseSysCache(tup);
495 
497 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1329
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:228
FormData_pg_publication_rel * Form_pg_publication_rel
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39