PostgreSQL Source Code  git master
publicationcmds.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/partition.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/publicationcmds.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/varlena.h"
Include dependency graph for publicationcmds.c:

Go to the source code of this file.

Macros

#define MAX_RELCACHE_INVAL_MSGS   4096
 

Functions

static ListOpenTableList (List *tables)
 
static void CloseTableList (List *rels)
 
static void PublicationAddTables (Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
 
static void PublicationDropTables (Oid pubid, List *rels, bool missing_ok)
 
static void parse_publication_options (List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
 
ObjectAddress CreatePublication (CreatePublicationStmt *stmt)
 
static void AlterPublicationOptions (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
static void AlterPublicationTables (AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
 
void AlterPublication (AlterPublicationStmt *stmt)
 
void RemovePublicationRelById (Oid proid)
 
static void AlterPublicationOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterPublicationOwner (const char *name, Oid newOwnerId)
 
void AlterPublicationOwner_oid (Oid subid, Oid newOwnerId)
 

Macro Definition Documentation

◆ MAX_RELCACHE_INVAL_MSGS

#define MAX_RELCACHE_INVAL_MSGS   4096

Definition at line 49 of file publicationcmds.c.

Referenced by AlterPublicationOptions().

Function Documentation

◆ AlterPublication()

void AlterPublication ( AlterPublicationStmt stmt)

Definition at line 437 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, AlterPublicationOptions(), AlterPublicationTables(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, GetUserId(), heap_freetuple(), HeapTupleIsValid, OBJECT_PUBLICATION, AlterPublicationStmt::options, pg_publication_ownercheck(), PUBLICATIONNAME, AlterPublicationStmt::pubname, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ProcessUtilitySlow().

438 {
439  Relation rel;
440  HeapTuple tup;
441  Form_pg_publication pubform;
442 
443  rel = table_open(PublicationRelationId, RowExclusiveLock);
444 
446  CStringGetDatum(stmt->pubname));
447 
448  if (!HeapTupleIsValid(tup))
449  ereport(ERROR,
450  (errcode(ERRCODE_UNDEFINED_OBJECT),
451  errmsg("publication \"%s\" does not exist",
452  stmt->pubname)));
453 
454  pubform = (Form_pg_publication) GETSTRUCT(tup);
455 
456  /* must be owner */
457  if (!pg_publication_ownercheck(pubform->oid, GetUserId()))
459  stmt->pubname);
460 
461  if (stmt->options)
462  AlterPublicationOptions(stmt, rel, tup);
463  else
464  AlterPublicationTables(stmt, rel, tup);
465 
466  /* Cleanup. */
467  heap_freetuple(tup);
469 }
static void AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:476
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5225
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
static void AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup)
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOptions()

static void AlterPublicationOptions ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 259 of file publicationcmds.c.

References BoolGetDatum, CacheInvalidateRelcacheAll(), CacheInvalidateRelcacheByRelid(), CatalogTupleUpdate(), CommandCounterIncrement(), EventTriggerCollectSimpleCommand(), GetPublicationRelations(), GETSTRUCT, heap_modify_tuple(), InvalidObjectAddress, InvokeObjectPostAlterHook, lfirst_oid, list_length(), MAX_RELCACHE_INVAL_MSGS, ObjectAddressSet, AlterPublicationStmt::options, parse_publication_options(), PublicationActions::pubdelete, PublicationActions::pubinsert, PUBLICATION_PART_ALL, PublicationActions::pubtruncate, PublicationActions::pubupdate, RelationGetDescr, HeapTupleData::t_self, and values.

Referenced by AlterPublication().

261 {
262  bool nulls[Natts_pg_publication];
263  bool replaces[Natts_pg_publication];
264  Datum values[Natts_pg_publication];
265  bool publish_given;
266  PublicationActions pubactions;
267  bool publish_via_partition_root_given;
268  bool publish_via_partition_root;
269  ObjectAddress obj;
270  Form_pg_publication pubform;
271 
273  &publish_given, &pubactions,
274  &publish_via_partition_root_given,
275  &publish_via_partition_root);
276 
277  /* Everything ok, form a new tuple. */
278  memset(values, 0, sizeof(values));
279  memset(nulls, false, sizeof(nulls));
280  memset(replaces, false, sizeof(replaces));
281 
282  if (publish_given)
283  {
284  values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
285  replaces[Anum_pg_publication_pubinsert - 1] = true;
286 
287  values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
288  replaces[Anum_pg_publication_pubupdate - 1] = true;
289 
290  values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
291  replaces[Anum_pg_publication_pubdelete - 1] = true;
292 
293  values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
294  replaces[Anum_pg_publication_pubtruncate - 1] = true;
295  }
296 
297  if (publish_via_partition_root_given)
298  {
299  values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
300  replaces[Anum_pg_publication_pubviaroot - 1] = true;
301  }
302 
303  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
304  replaces);
305 
306  /* Update the catalog. */
307  CatalogTupleUpdate(rel, &tup->t_self, tup);
308 
310 
311  pubform = (Form_pg_publication) GETSTRUCT(tup);
312 
313  /* Invalidate the relcache. */
314  if (pubform->puballtables)
315  {
317  }
318  else
319  {
320  /*
321  * For any partitioned tables contained in the publication, we must
322  * invalidate all partitions contained in the respective partition
323  * trees, not just those explicitly mentioned in the publication.
324  */
325  List *relids = GetPublicationRelations(pubform->oid,
327 
328  /*
329  * We don't want to send too many individual messages, at some point
330  * it's cheaper to just reset whole relcache.
331  */
332  if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
333  {
334  ListCell *lc;
335 
336  foreach(lc, relids)
337  {
338  Oid relid = lfirst_oid(lc);
339 
341  }
342  }
343  else
345  }
346 
347  ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
349  (Node *) stmt);
350 
351  InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
352 }
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
#define RelationGetDescr(relation)
Definition: rel.h:482
Definition: nodes.h:528
unsigned int Oid
Definition: postgres_ext.h:31
#define MAX_RELCACHE_INVAL_MSGS
ItemPointerData t_self
Definition: htup.h:65
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1337
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
void CacheInvalidateRelcacheAll(void)
Definition: inval.c:1302
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1021
#define BoolGetDatum(X)
Definition: postgres.h:402
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:165
static void parse_publication_options(List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
const ObjectAddress InvalidObjectAddress
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
FormData_pg_publication * Form_pg_publication
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
Definition: pg_list.h:50
#define lfirst_oid(lc)
Definition: pg_list.h:191

◆ AlterPublicationOwner()

ObjectAddress AlterPublicationOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 724 of file publicationcmds.c.

References AlterPublicationOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, ObjectAddressSet, PUBLICATIONNAME, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

725 {
726  Oid subid;
727  HeapTuple tup;
728  Relation rel;
729  ObjectAddress address;
730  Form_pg_publication pubform;
731 
732  rel = table_open(PublicationRelationId, RowExclusiveLock);
733 
735 
736  if (!HeapTupleIsValid(tup))
737  ereport(ERROR,
738  (errcode(ERRCODE_UNDEFINED_OBJECT),
739  errmsg("publication \"%s\" does not exist", name)));
740 
741  pubform = (Form_pg_publication) GETSTRUCT(tup);
742  subid = pubform->oid;
743 
744  AlterPublicationOwner_internal(rel, tup, newOwnerId);
745 
746  ObjectAddressSet(address, PublicationRelationId, subid);
747 
748  heap_freetuple(tup);
749 
751 
752  return address;
753 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:561
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationOwner_internal()

static void AlterPublicationOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 673 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, CatalogTupleUpdate(), changeDependencyOnOwner(), check_is_member_of_role(), ereport, errcode(), errhint(), errmsg(), ERROR, get_database_name(), GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, MyDatabaseId, NameStr, OBJECT_DATABASE, OBJECT_PUBLICATION, pg_database_aclcheck(), pg_publication_ownercheck(), superuser(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterPublicationOwner(), and AlterPublicationOwner_oid().

674 {
675  Form_pg_publication form;
676 
677  form = (Form_pg_publication) GETSTRUCT(tup);
678 
679  if (form->pubowner == newOwnerId)
680  return;
681 
682  if (!superuser())
683  {
684  AclResult aclresult;
685 
686  /* Must be owner */
687  if (!pg_publication_ownercheck(form->oid, GetUserId()))
689  NameStr(form->pubname));
690 
691  /* Must be able to become new owner */
692  check_is_member_of_role(GetUserId(), newOwnerId);
693 
694  /* New owner must have CREATE privilege on database */
695  aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
696  if (aclresult != ACLCHECK_OK)
697  aclcheck_error(aclresult, OBJECT_DATABASE,
699 
700  if (form->puballtables && !superuser_arg(newOwnerId))
701  ereport(ERROR,
702  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
703  errmsg("permission denied to change owner of publication \"%s\"",
704  NameStr(form->pubname)),
705  errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
706  }
707 
708  form->pubowner = newOwnerId;
709  CatalogTupleUpdate(rel, &tup->t_self, tup);
710 
711  /* Update owner dependency reference */
712  changeDependencyOnOwner(PublicationRelationId,
713  form->oid,
714  newOwnerId);
715 
716  InvokeObjectPostAlterHook(PublicationRelationId,
717  form->oid, 0);
718 }
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:476
bool pg_publication_ownercheck(Oid pub_oid, Oid roleid)
Definition: aclchk.c:5225
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:309
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
#define ERROR
Definition: elog.h:43
#define ACL_CREATE
Definition: parsenodes.h:84
ItemPointerData t_self
Definition: htup.h:65
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
void check_is_member_of_role(Oid member, Oid role)
Definition: acl.c:4938
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
AclResult
Definition: acl.h:177
Oid MyDatabaseId
Definition: globals.c:85
#define ereport(elevel,...)
Definition: elog.h:144
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4575
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define NameStr(name)
Definition: c.h:622
FormData_pg_publication * Form_pg_publication

◆ AlterPublicationOwner_oid()

void AlterPublicationOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 759 of file publicationcmds.c.

References AlterPublicationOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONOID, RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned().

760 {
761  HeapTuple tup;
762  Relation rel;
763 
764  rel = table_open(PublicationRelationId, RowExclusiveLock);
765 
767 
768  if (!HeapTupleIsValid(tup))
769  ereport(ERROR,
770  (errcode(ERRCODE_UNDEFINED_OBJECT),
771  errmsg("publication with OID %u does not exist", subid)));
772 
773  AlterPublicationOwner_internal(rel, tup, newOwnerId);
774 
775  heap_freetuple(tup);
776 
778 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
static void AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterPublicationTables()

static void AlterPublicationTables ( AlterPublicationStmt stmt,
Relation  rel,
HeapTuple  tup 
)
static

Definition at line 358 of file publicationcmds.c.

References Assert, CloseTableList(), DEFELEM_ADD, DEFELEM_DROP, ereport, errcode(), errdetail(), errmsg(), ERROR, GetPublicationRelations(), GETSTRUCT, lappend(), lfirst, lfirst_oid, list_length(), NameStr, NIL, OpenTableList(), PUBLICATION_PART_ROOT, PublicationAddTables(), PublicationDropTables(), RelationGetRelid, ShareUpdateExclusiveLock, table_open(), AlterPublicationStmt::tableAction, and AlterPublicationStmt::tables.

Referenced by AlterPublication().

360 {
361  List *rels = NIL;
363  Oid pubid = pubform->oid;
364 
365  /* Check that user is allowed to manipulate the publication tables. */
366  if (pubform->puballtables)
367  ereport(ERROR,
368  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
369  errmsg("publication \"%s\" is defined as FOR ALL TABLES",
370  NameStr(pubform->pubname)),
371  errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
372 
373  Assert(list_length(stmt->tables) > 0);
374 
375  rels = OpenTableList(stmt->tables);
376 
377  if (stmt->tableAction == DEFELEM_ADD)
378  PublicationAddTables(pubid, rels, false, stmt);
379  else if (stmt->tableAction == DEFELEM_DROP)
380  PublicationDropTables(pubid, rels, false);
381  else /* DEFELEM_SET */
382  {
383  List *oldrelids = GetPublicationRelations(pubid,
385  List *delrels = NIL;
386  ListCell *oldlc;
387 
388  /* Calculate which relations to drop. */
389  foreach(oldlc, oldrelids)
390  {
391  Oid oldrelid = lfirst_oid(oldlc);
392  ListCell *newlc;
393  bool found = false;
394 
395  foreach(newlc, rels)
396  {
397  Relation newrel = (Relation) lfirst(newlc);
398 
399  if (RelationGetRelid(newrel) == oldrelid)
400  {
401  found = true;
402  break;
403  }
404  }
405 
406  if (!found)
407  {
408  Relation oldrel = table_open(oldrelid,
410 
411  delrels = lappend(delrels, oldrel);
412  }
413  }
414 
415  /* And drop them. */
416  PublicationDropTables(pubid, delrels, true);
417 
418  /*
419  * Don't bother calculating the difference for adding, we'll catch and
420  * skip existing ones when doing catalog update.
421  */
422  PublicationAddTables(pubid, rels, true, stmt);
423 
424  CloseTableList(delrels);
425  }
426 
427  CloseTableList(rels);
428 }
#define NIL
Definition: pg_list.h:65
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
int errcode(int sqlerrcode)
Definition: elog.c:610
unsigned int Oid
Definition: postgres_ext.h:31
static void CloseTableList(List *rels)
struct RelationData * Relation
Definition: relcache.h:27
#define ERROR
Definition: elog.h:43
int errdetail(const char *fmt,...)
Definition: elog.c:957
List * lappend(List *list, void *datum)
Definition: list.c:321
DefElemAction tableAction
Definition: parsenodes.h:3537
#define ereport(elevel,...)
Definition: elog.h:144
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:189
static int list_length(const List *l)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define NameStr(name)
Definition: c.h:622
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:456
static List * OpenTableList(List *tables)
#define lfirst_oid(lc)
Definition: pg_list.h:191

◆ CloseTableList()

static void CloseTableList ( List rels)
static

Definition at line 590 of file publicationcmds.c.

References lfirst, NoLock, and table_close().

Referenced by AlterPublicationTables(), and CreatePublication().

591 {
592  ListCell *lc;
593 
594  foreach(lc, rels)
595  {
596  Relation rel = (Relation) lfirst(lc);
597 
598  table_close(rel, NoLock);
599  }
600 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
struct RelationData * Relation
Definition: relcache.h:27
#define NoLock
Definition: lockdefs.h:34
#define lfirst(lc)
Definition: pg_list.h:189

◆ CreatePublication()

ObjectAddress CreatePublication ( CreatePublicationStmt stmt)

Definition at line 148 of file publicationcmds.c.

References ACL_CREATE, aclcheck_error(), ACLCHECK_OK, Assert, BoolGetDatum, CatalogTupleInsert(), CloseTableList(), CommandCounterIncrement(), CStringGetDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, CreatePublicationStmt::for_all_tables, get_database_name(), GetNewOidWithIndex(), GetSysCacheOid1, GetUserId(), heap_form_tuple(), heap_freetuple(), InvokeObjectPostCreateHook, list_length(), MyDatabaseId, namein(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, OpenTableList(), CreatePublicationStmt::options, parse_publication_options(), pg_database_aclcheck(), PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationAddTables(), PUBLICATIONNAME, PublicationObjectIndexId, CreatePublicationStmt::pubname, PublicationActions::pubtruncate, PublicationActions::pubupdate, recordDependencyOnOwner(), RelationGetDescr, RowExclusiveLock, superuser(), table_close(), table_open(), CreatePublicationStmt::tables, values, wal_level, WAL_LEVEL_LOGICAL, and WARNING.

Referenced by ProcessUtilitySlow().

149 {
150  Relation rel;
151  ObjectAddress myself;
152  Oid puboid;
153  bool nulls[Natts_pg_publication];
154  Datum values[Natts_pg_publication];
155  HeapTuple tup;
156  bool publish_given;
157  PublicationActions pubactions;
158  bool publish_via_partition_root_given;
159  bool publish_via_partition_root;
160  AclResult aclresult;
161 
162  /* must have CREATE privilege on database */
164  if (aclresult != ACLCHECK_OK)
165  aclcheck_error(aclresult, OBJECT_DATABASE,
167 
168  /* FOR ALL TABLES requires superuser */
169  if (stmt->for_all_tables && !superuser())
170  ereport(ERROR,
171  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
172  errmsg("must be superuser to create FOR ALL TABLES publication")));
173 
174  rel = table_open(PublicationRelationId, RowExclusiveLock);
175 
176  /* Check if name is used */
177  puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
178  CStringGetDatum(stmt->pubname));
179  if (OidIsValid(puboid))
180  {
181  ereport(ERROR,
183  errmsg("publication \"%s\" already exists",
184  stmt->pubname)));
185  }
186 
187  /* Form a tuple. */
188  memset(values, 0, sizeof(values));
189  memset(nulls, false, sizeof(nulls));
190 
191  values[Anum_pg_publication_pubname - 1] =
193  values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
194 
196  &publish_given, &pubactions,
197  &publish_via_partition_root_given,
198  &publish_via_partition_root);
199 
201  Anum_pg_publication_oid);
202  values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
203  values[Anum_pg_publication_puballtables - 1] =
205  values[Anum_pg_publication_pubinsert - 1] =
206  BoolGetDatum(pubactions.pubinsert);
207  values[Anum_pg_publication_pubupdate - 1] =
208  BoolGetDatum(pubactions.pubupdate);
209  values[Anum_pg_publication_pubdelete - 1] =
210  BoolGetDatum(pubactions.pubdelete);
211  values[Anum_pg_publication_pubtruncate - 1] =
212  BoolGetDatum(pubactions.pubtruncate);
213  values[Anum_pg_publication_pubviaroot - 1] =
214  BoolGetDatum(publish_via_partition_root);
215 
216  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
217 
218  /* Insert tuple into catalog. */
219  CatalogTupleInsert(rel, tup);
220  heap_freetuple(tup);
221 
222  recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
223 
224  ObjectAddressSet(myself, PublicationRelationId, puboid);
225 
226  /* Make the changes visible. */
228 
229  if (stmt->tables)
230  {
231  List *rels;
232 
233  Assert(list_length(stmt->tables) > 0);
234 
235  rels = OpenTableList(stmt->tables);
236  PublicationAddTables(puboid, rels, true, NULL);
237  CloseTableList(rels);
238  }
239 
241 
242  InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
243 
245  {
247  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
248  errmsg("wal_level is insufficient to publish logical changes"),
249  errhint("Set wal_level to logical before creating subscriptions.")));
250  }
251 
252  return myself;
253 }
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:317
static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt)
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:482
Oid GetUserId(void)
Definition: miscinit.c:476
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:192
int wal_level
Definition: xlog.c:107
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:651
static void CloseTableList(List *rels)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define PublicationObjectIndexId
Definition: indexing.h:357
#define ACL_CREATE
Definition: parsenodes.h:84
char * get_database_name(Oid dbid)
Definition: dbcommands.c:2155
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define WARNING
Definition: elog.h:40
AclResult
Definition: acl.h:177
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1021
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
AclResult pg_database_aclcheck(Oid db_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4575
#define Assert(condition)
Definition: c.h:745
static int list_length(const List *l)
Definition: pg_list.h:169
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:165
static void parse_publication_options(List *options, bool *publish_given, PublicationActions *pubactions, bool *publish_via_partition_root_given, bool *publish_via_partition_root)
int errmsg(const char *fmt,...)
Definition: elog.c:824
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
Definition: pg_list.h:50
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
static List * OpenTableList(List *tables)

◆ OpenTableList()

static List * OpenTableList ( List tables)
static

Definition at line 507 of file publicationcmds.c.

References castNode, CHECK_FOR_INTERRUPTS, find_all_inheritors(), RangeVar::inh, lappend(), lappend_oid(), lfirst, lfirst_oid, list_free(), list_member_oid(), NIL, NoLock, RelationData::rd_rel, RelationGetRelid, ShareUpdateExclusiveLock, table_close(), table_open(), and table_openrv().

Referenced by AlterPublicationTables(), and CreatePublication().

508 {
509  List *relids = NIL;
510  List *rels = NIL;
511  ListCell *lc;
512 
513  /*
514  * Open, share-lock, and check all the explicitly-specified relations
515  */
516  foreach(lc, tables)
517  {
518  RangeVar *rv = castNode(RangeVar, lfirst(lc));
519  bool recurse = rv->inh;
520  Relation rel;
521  Oid myrelid;
522 
523  /* Allow query cancel in case this takes a long time */
525 
527  myrelid = RelationGetRelid(rel);
528 
529  /*
530  * Filter out duplicates if user specifies "foo, foo".
531  *
532  * Note that this algorithm is known to not be very efficient (O(N^2))
533  * but given that it only works on list of tables given to us by user
534  * it's deemed acceptable.
535  */
536  if (list_member_oid(relids, myrelid))
537  {
539  continue;
540  }
541 
542  rels = lappend(rels, rel);
543  relids = lappend_oid(relids, myrelid);
544 
545  /*
546  * Add children of this rel, if requested, so that they too are added
547  * to the publication. A partitioned table can't have any inheritance
548  * children other than its partitions, which need not be explicitly
549  * added to the publication.
550  */
551  if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
552  {
553  List *children;
554  ListCell *child;
555 
556  children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
557  NULL);
558 
559  foreach(child, children)
560  {
561  Oid childrelid = lfirst_oid(child);
562 
563  /* Allow query cancel in case this takes a long time */
565 
566  /*
567  * Skip duplicates if user specified both parent and child
568  * tables.
569  */
570  if (list_member_oid(relids, childrelid))
571  continue;
572 
573  /* find_all_inheritors already got lock */
574  rel = table_open(childrelid, NoLock);
575  rels = lappend(rels, rel);
576  relids = lappend_oid(relids, childrelid);
577  }
578  }
579  }
580 
581  list_free(relids);
582 
583  return rels;
584 }
#define NIL
Definition: pg_list.h:65
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define castNode(_type_, nodeptr)
Definition: nodes.h:597
Form_pg_class rd_rel
Definition: rel.h:109
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:357
#define NoLock
Definition: lockdefs.h:34
bool inh
Definition: primnodes.h:69
List * lappend(List *list, void *datum)
Definition: list.c:321
Relation table_openrv(const RangeVar *relation, LOCKMODE lockmode)
Definition: table.c:102
#define ShareUpdateExclusiveLock
Definition: lockdefs.h:39
bool list_member_oid(const List *list, Oid datum)
Definition: list.c:674
#define lfirst(lc)
Definition: pg_list.h:189
List * find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **numparents)
Definition: pg_inherits.c:165
void list_free(List *list)
Definition: list.c:1376
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:99
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:456
#define lfirst_oid(lc)
Definition: pg_list.h:191

◆ parse_publication_options()

static void parse_publication_options ( List options,
bool publish_given,
PublicationActions pubactions,
bool publish_via_partition_root_given,
bool publish_via_partition_root 
)
static

Definition at line 58 of file publicationcmds.c.

References defGetBoolean(), defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, lfirst, PublicationActions::pubdelete, PublicationActions::pubinsert, PublicationActions::pubtruncate, PublicationActions::pubupdate, and SplitIdentifierString().

Referenced by AlterPublicationOptions(), and CreatePublication().

63 {
64  ListCell *lc;
65 
66  *publish_given = false;
67  *publish_via_partition_root_given = false;
68 
69  /* defaults */
70  pubactions->pubinsert = true;
71  pubactions->pubupdate = true;
72  pubactions->pubdelete = true;
73  pubactions->pubtruncate = true;
74  *publish_via_partition_root = false;
75 
76  /* Parse options */
77  foreach(lc, options)
78  {
79  DefElem *defel = (DefElem *) lfirst(lc);
80 
81  if (strcmp(defel->defname, "publish") == 0)
82  {
83  char *publish;
84  List *publish_list;
85  ListCell *lc;
86 
87  if (*publish_given)
88  ereport(ERROR,
89  (errcode(ERRCODE_SYNTAX_ERROR),
90  errmsg("conflicting or redundant options")));
91 
92  /*
93  * If publish option was given only the explicitly listed actions
94  * should be published.
95  */
96  pubactions->pubinsert = false;
97  pubactions->pubupdate = false;
98  pubactions->pubdelete = false;
99  pubactions->pubtruncate = false;
100 
101  *publish_given = true;
102  publish = defGetString(defel);
103 
104  if (!SplitIdentifierString(publish, ',', &publish_list))
105  ereport(ERROR,
106  (errcode(ERRCODE_SYNTAX_ERROR),
107  errmsg("invalid list syntax for \"publish\" option")));
108 
109  /* Process the option list. */
110  foreach(lc, publish_list)
111  {
112  char *publish_opt = (char *) lfirst(lc);
113 
114  if (strcmp(publish_opt, "insert") == 0)
115  pubactions->pubinsert = true;
116  else if (strcmp(publish_opt, "update") == 0)
117  pubactions->pubupdate = true;
118  else if (strcmp(publish_opt, "delete") == 0)
119  pubactions->pubdelete = true;
120  else if (strcmp(publish_opt, "truncate") == 0)
121  pubactions->pubtruncate = true;
122  else
123  ereport(ERROR,
124  (errcode(ERRCODE_SYNTAX_ERROR),
125  errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
126  }
127  }
128  else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
129  {
130  if (*publish_via_partition_root_given)
131  ereport(ERROR,
132  (errcode(ERRCODE_SYNTAX_ERROR),
133  errmsg("conflicting or redundant options")));
134  *publish_via_partition_root_given = true;
135  *publish_via_partition_root = defGetBoolean(defel);
136  }
137  else
138  ereport(ERROR,
139  (errcode(ERRCODE_SYNTAX_ERROR),
140  errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
141  }
142 }
int errcode(int sqlerrcode)
Definition: elog.c:610
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:3702
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:189
int errmsg(const char *fmt,...)
Definition: elog.c:824
char * defname
Definition: parsenodes.h:733
Definition: pg_list.h:50

◆ PublicationAddTables()

static void PublicationAddTables ( Oid  pubid,
List rels,
bool  if_not_exists,
AlterPublicationStmt stmt 
)
static

Definition at line 606 of file publicationcmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, Assert, EventTriggerCollectSimpleCommand(), AlterPublicationStmt::for_all_tables, get_relkind_objtype(), GetUserId(), InvalidObjectAddress, InvokeObjectPostCreateHook, lfirst, ObjectAddress::objectId, pg_class_ownercheck(), publication_add_relation(), RelationData::rd_rel, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables(), and CreatePublication().

608 {
609  ListCell *lc;
610 
611  Assert(!stmt || !stmt->for_all_tables);
612 
613  foreach(lc, rels)
614  {
615  Relation rel = (Relation) lfirst(lc);
616  ObjectAddress obj;
617 
618  /* Must be owner of the table or superuser. */
622 
623  obj = publication_add_relation(pubid, rel, if_not_exists);
624  if (stmt)
625  {
627  (Node *) stmt);
628 
629  InvokeObjectPostCreateHook(PublicationRelRelationId,
630  obj.objectId, 0);
631  }
632  }
633 }
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
Oid GetUserId(void)
Definition: miscinit.c:476
Definition: nodes.h:528
Form_pg_class rd_rel
Definition: rel.h:109
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
struct RelationData * Relation
Definition: relcache.h:27
#define RelationGetRelationName(relation)
Definition: rel.h:490
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:189
bool pg_class_ownercheck(Oid class_oid, Oid roleid)
Definition: aclchk.c:4687
const ObjectAddress InvalidObjectAddress
ObjectType get_relkind_objtype(char relkind)
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
void EventTriggerCollectSimpleCommand(ObjectAddress address, ObjectAddress secondaryObject, Node *parsetree)
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ PublicationDropTables()

static void PublicationDropTables ( Oid  pubid,
List rels,
bool  missing_ok 
)
static

Definition at line 639 of file publicationcmds.c.

References DROP_CASCADE, ereport, errcode(), errmsg(), ERROR, GetSysCacheOid2, lfirst, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, performDeletion(), PUBLICATIONRELMAP, RelationGetRelationName, and RelationGetRelid.

Referenced by AlterPublicationTables().

640 {
641  ObjectAddress obj;
642  ListCell *lc;
643  Oid prid;
644 
645  foreach(lc, rels)
646  {
647  Relation rel = (Relation) lfirst(lc);
648  Oid relid = RelationGetRelid(rel);
649 
650  prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
651  ObjectIdGetDatum(relid),
652  ObjectIdGetDatum(pubid));
653  if (!OidIsValid(prid))
654  {
655  if (missing_ok)
656  continue;
657 
658  ereport(ERROR,
659  (errcode(ERRCODE_UNDEFINED_OBJECT),
660  errmsg("relation \"%s\" is not part of the publication",
661  RelationGetRelationName(rel))));
662  }
663 
664  ObjectAddressSet(obj, PublicationRelRelationId, prid);
665  performDeletion(&obj, DROP_CASCADE, 0);
666  }
667 }
int errcode(int sqlerrcode)
Definition: elog.c:610
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:651
struct RelationData * Relation
Definition: relcache.h:27
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:312
#define RelationGetRelationName(relation)
Definition: rel.h:490
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:189
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define RelationGetRelid(relation)
Definition: rel.h:456

◆ RemovePublicationRelById()

void RemovePublicationRelById ( Oid  proid)

Definition at line 475 of file publicationcmds.c.

References CacheInvalidateRelcacheByRelid(), CatalogTupleDelete(), elog, ERROR, GETSTRUCT, HeapTupleIsValid, ObjectIdGetDatum, PUBLICATIONREL, ReleaseSysCache(), RowExclusiveLock, SearchSysCache1(), HeapTupleData::t_self, table_close(), and table_open().

Referenced by doDeletion().

476 {
477  Relation rel;
478  HeapTuple tup;
480 
481  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
482 
484 
485  if (!HeapTupleIsValid(tup))
486  elog(ERROR, "cache lookup failed for publication table %u",
487  proid);
488 
489  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
490 
491  /* Invalidate relcache so that publication info is rebuilt. */
492  CacheInvalidateRelcacheByRelid(pubrel->prrelid);
493 
494  CatalogTupleDelete(rel, &tup->t_self);
495 
496  ReleaseSysCache(tup);
497 
499 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define RowExclusiveLock
Definition: lockdefs.h:38
void CacheInvalidateRelcacheByRelid(Oid relid)
Definition: inval.c:1337
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define elog(elevel,...)
Definition: elog.h:214
FormData_pg_publication_rel * Form_pg_publication_rel
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39