PostgreSQL Source Code  git master
subscriptioncmds.c File Reference
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Functions

static Listfetch_table_list (WalReceiverConn *wrconn, List *publications)
 
static void parse_subscription_options (List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 621 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), MyDatabaseId, namein(), OBJECT_SUBSCRIPTION, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, synchronous_commit, HeapTupleData::t_self, table_close(), table_open(), values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

622 {
623  Relation rel;
624  ObjectAddress myself;
625  bool nulls[Natts_pg_subscription];
626  bool replaces[Natts_pg_subscription];
627  Datum values[Natts_pg_subscription];
628  HeapTuple tup;
629  Oid subid;
630  bool update_tuple = false;
631  Subscription *sub;
633 
634  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
635 
636  /* Fetch the existing tuple. */
638  CStringGetDatum(stmt->subname));
639 
640  if (!HeapTupleIsValid(tup))
641  ereport(ERROR,
642  (errcode(ERRCODE_UNDEFINED_OBJECT),
643  errmsg("subscription \"%s\" does not exist",
644  stmt->subname)));
645 
646  form = (Form_pg_subscription) GETSTRUCT(tup);
647  subid = form->oid;
648 
649  /* must be owner */
650  if (!pg_subscription_ownercheck(subid, GetUserId()))
652  stmt->subname);
653 
654  sub = GetSubscription(subid, false);
655 
656  /* Lock the subscription so nobody else can do anything with it. */
657  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
658 
659  /* Form a new tuple. */
660  memset(values, 0, sizeof(values));
661  memset(nulls, false, sizeof(nulls));
662  memset(replaces, false, sizeof(replaces));
663 
664  switch (stmt->kind)
665  {
667  {
668  char *slotname;
669  bool slotname_given;
670  char *synchronous_commit;
671 
672  parse_subscription_options(stmt->options, NULL, NULL, NULL,
673  NULL, &slotname_given, &slotname,
674  NULL, &synchronous_commit, NULL);
675 
676  if (slotname_given)
677  {
678  if (sub->enabled && !slotname)
679  ereport(ERROR,
680  (errcode(ERRCODE_SYNTAX_ERROR),
681  errmsg("cannot set %s for enabled subscription",
682  "slot_name = NONE")));
683 
684  if (slotname)
685  values[Anum_pg_subscription_subslotname - 1] =
687  else
688  nulls[Anum_pg_subscription_subslotname - 1] = true;
689  replaces[Anum_pg_subscription_subslotname - 1] = true;
690  }
691 
692  if (synchronous_commit)
693  {
694  values[Anum_pg_subscription_subsynccommit - 1] =
695  CStringGetTextDatum(synchronous_commit);
696  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
697  }
698 
699  update_tuple = true;
700  break;
701  }
702 
704  {
705  bool enabled,
706  enabled_given;
707 
709  &enabled_given, &enabled, NULL,
710  NULL, NULL, NULL, NULL, NULL);
711  Assert(enabled_given);
712 
713  if (!sub->slotname && enabled)
714  ereport(ERROR,
715  (errcode(ERRCODE_SYNTAX_ERROR),
716  errmsg("cannot enable subscription that does not have a slot name")));
717 
718  values[Anum_pg_subscription_subenabled - 1] =
719  BoolGetDatum(enabled);
720  replaces[Anum_pg_subscription_subenabled - 1] = true;
721 
722  if (enabled)
724 
725  update_tuple = true;
726  break;
727  }
728 
730  /* Load the library providing us libpq calls. */
731  load_file("libpqwalreceiver", false);
732  /* Check the connection info string. */
734 
735  values[Anum_pg_subscription_subconninfo - 1] =
737  replaces[Anum_pg_subscription_subconninfo - 1] = true;
738  update_tuple = true;
739  break;
740 
742  {
743  bool copy_data;
744  bool refresh;
745 
746  parse_subscription_options(stmt->options, NULL, NULL, NULL,
747  NULL, NULL, NULL, &copy_data,
748  NULL, &refresh);
749 
750  values[Anum_pg_subscription_subpublications - 1] =
752  replaces[Anum_pg_subscription_subpublications - 1] = true;
753 
754  update_tuple = true;
755 
756  /* Refresh if user asked us to. */
757  if (refresh)
758  {
759  if (!sub->enabled)
760  ereport(ERROR,
761  (errcode(ERRCODE_SYNTAX_ERROR),
762  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
763  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
764 
765  /* Make sure refresh sees the new list of publications. */
766  sub->publications = stmt->publication;
767 
768  AlterSubscription_refresh(sub, copy_data);
769  }
770 
771  break;
772  }
773 
775  {
776  bool copy_data;
777 
778  if (!sub->enabled)
779  ereport(ERROR,
780  (errcode(ERRCODE_SYNTAX_ERROR),
781  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
782 
783  parse_subscription_options(stmt->options, NULL, NULL, NULL,
784  NULL, NULL, NULL, &copy_data,
785  NULL, NULL);
786 
787  AlterSubscription_refresh(sub, copy_data);
788 
789  break;
790  }
791 
792  default:
793  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
794  stmt->kind);
795  }
796 
797  /* Update the catalog if needed. */
798  if (update_tuple)
799  {
800  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
801  replaces);
802 
803  CatalogTupleUpdate(rel, &tup->t_self, tup);
804 
805  heap_freetuple(tup);
806  }
807 
809 
810  ObjectAddressSet(myself, SubscriptionRelationId, subid);
811 
812  InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
813 
814  return myself;
815 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:454
Oid GetUserId(void)
Definition: miscinit.c:380
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:266
int errcode(int sqlerrcode)
Definition: elog.c:608
static Datum publicationListToArray(List *publist)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:615
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
AlterSubscriptionType kind
Definition: parsenodes.h:3542
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
List * publications
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:141
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define BoolGetDatum(X)
Definition: postgres.h:402
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:739
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:929
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5317

◆ AlterSubscription_refresh()

static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data 
)
static

Definition at line 512 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), CheckSubscriptionRelkind(), Subscription::conninfo, DEBUG1, ereport, errmsg(), ERROR, fetch_table_list(), get_namespace_name(), get_rel_name(), get_rel_namespace(), get_rel_relkind(), GetSubscriptionRelations(), InvalidXLogRecPtr, lfirst, list_length(), load_file(), logicalrep_worker_stop_at_commit(), Subscription::name, Subscription::oid, oid_cmp(), palloc(), Subscription::publications, qsort, RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), RangeVar::schemaname, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

513 {
514  char *err;
515  List *pubrel_names;
516  List *subrel_states;
517  Oid *subrel_local_oids;
518  Oid *pubrel_local_oids;
519  ListCell *lc;
520  int off;
521 
522  /* Load the library providing us libpq calls. */
523  load_file("libpqwalreceiver", false);
524 
525  /* Try to connect to the publisher. */
526  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
527  if (!wrconn)
528  ereport(ERROR,
529  (errmsg("could not connect to the publisher: %s", err)));
530 
531  /* Get the table list from publisher. */
532  pubrel_names = fetch_table_list(wrconn, sub->publications);
533 
534  /* We are done with the remote side, close connection. */
536 
537  /* Get local table list. */
538  subrel_states = GetSubscriptionRelations(sub->oid);
539 
540  /*
541  * Build qsorted array of local table oids for faster lookup. This can
542  * potentially contain all tables in the database so speed of lookup is
543  * important.
544  */
545  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
546  off = 0;
547  foreach(lc, subrel_states)
548  {
550 
551  subrel_local_oids[off++] = relstate->relid;
552  }
553  qsort(subrel_local_oids, list_length(subrel_states),
554  sizeof(Oid), oid_cmp);
555 
556  /*
557  * Walk over the remote tables and try to match them to locally known
558  * tables. If the table is not known locally create a new state for it.
559  *
560  * Also builds array of local oids of remote tables for the next step.
561  */
562  off = 0;
563  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
564 
565  foreach(lc, pubrel_names)
566  {
567  RangeVar *rv = (RangeVar *) lfirst(lc);
568  Oid relid;
569 
570  relid = RangeVarGetRelid(rv, AccessShareLock, false);
571 
572  /* Check for supported relkind. */
574  rv->schemaname, rv->relname);
575 
576  pubrel_local_oids[off++] = relid;
577 
578  if (!bsearch(&relid, subrel_local_oids,
579  list_length(subrel_states), sizeof(Oid), oid_cmp))
580  {
581  AddSubscriptionRelState(sub->oid, relid,
582  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
584  ereport(DEBUG1,
585  (errmsg("table \"%s.%s\" added to subscription \"%s\"",
586  rv->schemaname, rv->relname, sub->name)));
587  }
588  }
589 
590  /*
591  * Next remove state for tables we should not care about anymore using the
592  * data we collected above
593  */
594  qsort(pubrel_local_oids, list_length(pubrel_names),
595  sizeof(Oid), oid_cmp);
596 
597  for (off = 0; off < list_length(subrel_states); off++)
598  {
599  Oid relid = subrel_local_oids[off];
600 
601  if (!bsearch(&relid, pubrel_local_oids,
602  list_length(pubrel_names), sizeof(Oid), oid_cmp))
603  {
604  RemoveSubscriptionRel(sub->oid, relid);
605 
607 
608  ereport(DEBUG1,
609  (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
611  get_rel_name(relid),
612  sub->name)));
613  }
614  }
615 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:101
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
void RemoveSubscriptionRel(Oid subid, Oid relid)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:63
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1805
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1754
#define AccessShareLock
Definition: lockdefs.h:36
unsigned int Oid
Definition: postgres_ext.h:31
char * schemaname
Definition: primnodes.h:67
char * relname
Definition: primnodes.h:68
#define ERROR
Definition: elog.h:43
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3094
List * GetSubscriptionRelations(Oid subid)
List * publications
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
static int list_length(const List *l)
Definition: pg_list.h:169
#define walrcv_disconnect(conn)
Definition: walreceiver.h:292
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
Definition: launcher.c:549
#define qsort(a, b, c, d)
Definition: port.h:491
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:264

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1064 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

1065 {
1066  Oid subid;
1067  HeapTuple tup;
1068  Relation rel;
1069  ObjectAddress address;
1070  Form_pg_subscription form;
1071 
1072  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1073 
1076 
1077  if (!HeapTupleIsValid(tup))
1078  ereport(ERROR,
1079  (errcode(ERRCODE_UNDEFINED_OBJECT),
1080  errmsg("subscription \"%s\" does not exist", name)));
1081 
1082  form = (Form_pg_subscription) GETSTRUCT(tup);
1083  subid = form->oid;
1084 
1085  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1086 
1087  ObjectAddressSet(address, SubscriptionRelationId, subid);
1088 
1089  heap_freetuple(tup);
1090 
1092 
1093  return address;
1094 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:608
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
Oid MyDatabaseId
Definition: globals.c:85
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterSubscriptionOwner_internal()

static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 1027 of file subscriptioncmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, CatalogTupleUpdate(), changeDependencyOnOwner(), ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, NameStr, OBJECT_SUBSCRIPTION, pg_subscription_ownercheck(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

1028 {
1029  Form_pg_subscription form;
1030 
1031  form = (Form_pg_subscription) GETSTRUCT(tup);
1032 
1033  if (form->subowner == newOwnerId)
1034  return;
1035 
1036  if (!pg_subscription_ownercheck(form->oid, GetUserId()))
1038  NameStr(form->subname));
1039 
1040  /* New owner must be a superuser */
1041  if (!superuser_arg(newOwnerId))
1042  ereport(ERROR,
1043  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1044  errmsg("permission denied to change owner of subscription \"%s\"",
1045  NameStr(form->subname)),
1046  errhint("The owner of a subscription must be a superuser.")));
1047 
1048  form->subowner = newOwnerId;
1049  CatalogTupleUpdate(rel, &tup->t_self, tup);
1050 
1051  /* Update owner dependency reference */
1052  changeDependencyOnOwner(SubscriptionRelationId,
1053  form->oid,
1054  newOwnerId);
1055 
1056  InvokeObjectPostAlterHook(SubscriptionRelationId,
1057  form->oid, 0);
1058 }
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:380
int errcode(int sqlerrcode)
Definition: elog.c:608
FormData_pg_subscription * Form_pg_subscription
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:309
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define ereport(elevel, rest)
Definition: elog.h:141
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define NameStr(name)
Definition: c.h:616
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5317

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1100 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, table_close(), and table_open().

Referenced by shdepReassignOwned().

1101 {
1102  HeapTuple tup;
1103  Relation rel;
1104 
1105  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1106 
1108 
1109  if (!HeapTupleIsValid(tup))
1110  ereport(ERROR,
1111  (errcode(ERRCODE_UNDEFINED_OBJECT),
1112  errmsg("subscription with OID %u does not exist", subid)));
1113 
1114  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1115 
1116  heap_freetuple(tup);
1117 
1119 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errcode(int sqlerrcode)
Definition: elog.c:608
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:822
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ CreateSubscription()

ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 307 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, create_slot, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), heap_form_tuple(), heap_freetuple(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, PreventInTransactionBlock(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, snprintf, CreateSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionObjectIndexId, superuser(), synchronous_commit, table_close(), table_open(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

308 {
309  Relation rel;
310  ObjectAddress myself;
311  Oid subid;
312  bool nulls[Natts_pg_subscription];
313  Datum values[Natts_pg_subscription];
314  Oid owner = GetUserId();
315  HeapTuple tup;
316  bool connect;
317  bool enabled_given;
318  bool enabled;
319  bool copy_data;
320  char *synchronous_commit;
321  char *conninfo;
322  char *slotname;
323  bool slotname_given;
324  char originname[NAMEDATALEN];
325  bool create_slot;
326  List *publications;
327 
328  /*
329  * Parse and check options.
330  *
331  * Connection and publication should not be specified here.
332  */
333  parse_subscription_options(stmt->options, &connect, &enabled_given,
334  &enabled, &create_slot, &slotname_given,
335  &slotname, &copy_data, &synchronous_commit,
336  NULL);
337 
338  /*
339  * Since creating a replication slot is not transactional, rolling back
340  * the transaction leaves the created replication slot. So we cannot run
341  * CREATE SUBSCRIPTION inside a transaction block if creating a
342  * replication slot.
343  */
344  if (create_slot)
345  PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
346 
347  if (!superuser())
348  ereport(ERROR,
349  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
350  (errmsg("must be superuser to create subscriptions"))));
351 
352  /*
353  * If built with appropriate switch, whine when regression-testing
354  * conventions for subscription names are violated.
355  */
356 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
357  if (strncmp(stmt->subname, "regress_", 8) != 0)
358  elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
359 #endif
360 
361  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
362 
363  /* Check if name is used */
364  subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
366  if (OidIsValid(subid))
367  {
368  ereport(ERROR,
370  errmsg("subscription \"%s\" already exists",
371  stmt->subname)));
372  }
373 
374  if (!slotname_given && slotname == NULL)
375  slotname = stmt->subname;
376 
377  /* The default for synchronous_commit of subscriptions is off. */
378  if (synchronous_commit == NULL)
379  synchronous_commit = "off";
380 
381  conninfo = stmt->conninfo;
382  publications = stmt->publication;
383 
384  /* Load the library providing us libpq calls. */
385  load_file("libpqwalreceiver", false);
386 
387  /* Check the connection info string. */
388  walrcv_check_conninfo(conninfo);
389 
390  /* Everything ok, form a new tuple. */
391  memset(values, 0, sizeof(values));
392  memset(nulls, false, sizeof(nulls));
393 
395  Anum_pg_subscription_oid);
396  values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
397  values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
398  values[Anum_pg_subscription_subname - 1] =
400  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
401  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
402  values[Anum_pg_subscription_subconninfo - 1] =
403  CStringGetTextDatum(conninfo);
404  if (slotname)
405  values[Anum_pg_subscription_subslotname - 1] =
407  else
408  nulls[Anum_pg_subscription_subslotname - 1] = true;
409  values[Anum_pg_subscription_subsynccommit - 1] =
410  CStringGetTextDatum(synchronous_commit);
411  values[Anum_pg_subscription_subpublications - 1] =
412  publicationListToArray(publications);
413 
414  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
415 
416  /* Insert tuple into catalog. */
417  CatalogTupleInsert(rel, tup);
418  heap_freetuple(tup);
419 
420  recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
421 
422  snprintf(originname, sizeof(originname), "pg_%u", subid);
423  replorigin_create(originname);
424 
425  /*
426  * Connect to remote side to execute requested commands and fetch table
427  * info.
428  */
429  if (connect)
430  {
431  char *err;
433  List *tables;
434  ListCell *lc;
435  char table_state;
436 
437  /* Try to connect to the publisher. */
438  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
439  if (!wrconn)
440  ereport(ERROR,
441  (errmsg("could not connect to the publisher: %s", err)));
442 
443  PG_TRY();
444  {
445  /*
446  * Set sync state based on if we were asked to do data copy or
447  * not.
448  */
449  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
450 
451  /*
452  * Get the table list from publisher and build local table status
453  * info.
454  */
455  tables = fetch_table_list(wrconn, publications);
456  foreach(lc, tables)
457  {
458  RangeVar *rv = (RangeVar *) lfirst(lc);
459  Oid relid;
460 
461  relid = RangeVarGetRelid(rv, AccessShareLock, false);
462 
463  /* Check for supported relkind. */
465  rv->schemaname, rv->relname);
466 
467  AddSubscriptionRelState(subid, relid, table_state,
469  }
470 
471  /*
472  * If requested, create permanent slot for the subscription. We
473  * won't use the initial snapshot for anything, so no need to
474  * export it.
475  */
476  if (create_slot)
477  {
478  Assert(slotname);
479 
480  walrcv_create_slot(wrconn, slotname, false,
481  CRS_NOEXPORT_SNAPSHOT, NULL);
482  ereport(NOTICE,
483  (errmsg("created replication slot \"%s\" on publisher",
484  slotname)));
485  }
486  }
487  PG_FINALLY();
488  {
489  walrcv_disconnect(wrconn);
490  }
491  PG_END_TRY();
492  }
493  else
495  /* translator: %s is an SQL ALTER statement */
496  (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
497  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
498 
500 
501  if (enabled)
503 
504  ObjectAddressSet(myself, SubscriptionRelationId, subid);
505 
506  InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
507 
508  return myself;
509 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:322
WalReceiverConn * wrconn
Definition: worker.c:101
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:454
Oid GetUserId(void)
Definition: miscinit.c:380
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:63
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:266
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1805
#define AccessShareLock
Definition: lockdefs.h:36
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:608
bool superuser(void)
Definition: superuser.c:46
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define connect(s, name, namelen)
Definition: win32_port.h:435
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:615
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:286
#define OidIsValid(objectId)
Definition: c.h:645
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3331
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:239
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
#define PG_FINALLY()
Definition: elog.h:339
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
#define NOTICE
Definition: elog.h:37
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define walrcv_disconnect(conn)
Definition: walreceiver.h:292
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:929
#define PG_TRY()
Definition: elog.h:322
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
#define PG_END_TRY()
Definition: elog.h:347
#define SubscriptionObjectIndexId
Definition: indexing.h:361
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:264

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 821 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GETSTRUCT, GetUserId(), HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, OBJECT_SUBSCRIPTION, ObjectAddressSet, pfree(), PG_END_TRY, PG_FINALLY, pg_subscription_ownercheck(), PG_TRY, PreventInTransactionBlock(), pstrdup(), quote_identifier(), ReleaseSysCache(), LogicalRepWorker::relid, RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2(), snprintf, WalRcvExecResult::status, LogicalRepWorker::subid, subname, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SysCacheGetAttr(), HeapTupleData::t_self, table_close(), table_open(), TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

822 {
823  Relation rel;
824  ObjectAddress myself;
825  HeapTuple tup;
826  Oid subid;
827  Datum datum;
828  bool isnull;
829  char *subname;
830  char *conninfo;
831  char *slotname;
832  List *subworkers;
833  ListCell *lc;
834  char originname[NAMEDATALEN];
835  char *err = NULL;
836  RepOriginId originid;
837  WalReceiverConn *wrconn = NULL;
838  StringInfoData cmd;
840 
841  /*
842  * Lock pg_subscription with AccessExclusiveLock to ensure that the
843  * launcher doesn't restart new worker during dropping the subscription
844  */
845  rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
846 
848  CStringGetDatum(stmt->subname));
849 
850  if (!HeapTupleIsValid(tup))
851  {
852  table_close(rel, NoLock);
853 
854  if (!stmt->missing_ok)
855  ereport(ERROR,
856  (errcode(ERRCODE_UNDEFINED_OBJECT),
857  errmsg("subscription \"%s\" does not exist",
858  stmt->subname)));
859  else
860  ereport(NOTICE,
861  (errmsg("subscription \"%s\" does not exist, skipping",
862  stmt->subname)));
863 
864  return;
865  }
866 
867  form = (Form_pg_subscription) GETSTRUCT(tup);
868  subid = form->oid;
869 
870  /* must be owner */
871  if (!pg_subscription_ownercheck(subid, GetUserId()))
873  stmt->subname);
874 
875  /* DROP hook for the subscription being removed */
876  InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
877 
878  /*
879  * Lock the subscription so nobody else can do anything with it (including
880  * the replication workers).
881  */
882  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
883 
884  /* Get subname */
885  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
886  Anum_pg_subscription_subname, &isnull);
887  Assert(!isnull);
888  subname = pstrdup(NameStr(*DatumGetName(datum)));
889 
890  /* Get conninfo */
891  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
892  Anum_pg_subscription_subconninfo, &isnull);
893  Assert(!isnull);
894  conninfo = TextDatumGetCString(datum);
895 
896  /* Get slotname */
897  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
898  Anum_pg_subscription_subslotname, &isnull);
899  if (!isnull)
900  slotname = pstrdup(NameStr(*DatumGetName(datum)));
901  else
902  slotname = NULL;
903 
904  /*
905  * Since dropping a replication slot is not transactional, the replication
906  * slot stays dropped even if the transaction rolls back. So we cannot
907  * run DROP SUBSCRIPTION inside a transaction block if dropping the
908  * replication slot.
909  *
910  * XXX The command name should really be something like "DROP SUBSCRIPTION
911  * of a subscription that is associated with a replication slot", but we
912  * don't have the proper facilities for that.
913  */
914  if (slotname)
915  PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
916 
917 
918  ObjectAddressSet(myself, SubscriptionRelationId, subid);
919  EventTriggerSQLDropAddObject(&myself, true, true);
920 
921  /* Remove the tuple from catalog. */
922  CatalogTupleDelete(rel, &tup->t_self);
923 
924  ReleaseSysCache(tup);
925 
926  /*
927  * Stop all the subscription workers immediately.
928  *
929  * This is necessary if we are dropping the replication slot, so that the
930  * slot becomes accessible.
931  *
932  * It is also necessary if the subscription is disabled and was disabled
933  * in the same transaction. Then the workers haven't seen the disabling
934  * yet and will still be running, leading to hangs later when we want to
935  * drop the replication origin. If the subscription was disabled before
936  * this transaction, then there shouldn't be any workers left, so this
937  * won't make a difference.
938  *
939  * New workers won't be started because we hold an exclusive lock on the
940  * subscription till the end of the transaction.
941  */
942  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
943  subworkers = logicalrep_workers_find(subid, false);
944  LWLockRelease(LogicalRepWorkerLock);
945  foreach(lc, subworkers)
946  {
948 
950  }
951  list_free(subworkers);
952 
953  /* Clean up dependencies */
954  deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
955 
956  /* Remove any associated relation synchronization states. */
958 
959  /* Remove the origin tracking if exists. */
960  snprintf(originname, sizeof(originname), "pg_%u", subid);
961  originid = replorigin_by_name(originname, true);
962  if (originid != InvalidRepOriginId)
963  replorigin_drop(originid, false);
964 
965  /*
966  * If there is no slot associated with the subscription, we can finish
967  * here.
968  */
969  if (!slotname)
970  {
971  table_close(rel, NoLock);
972  return;
973  }
974 
975  /*
976  * Otherwise drop the replication slot at the publisher node using the
977  * replication connection.
978  */
979  load_file("libpqwalreceiver", false);
980 
981  initStringInfo(&cmd);
982  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
983 
984  wrconn = walrcv_connect(conninfo, true, subname, &err);
985  if (wrconn == NULL)
986  ereport(ERROR,
987  (errmsg("could not connect to publisher when attempting to "
988  "drop the replication slot \"%s\"", slotname),
989  errdetail("The error was: %s", err),
990  /* translator: %s is an SQL ALTER command */
991  errhint("Use %s to disassociate the subscription from the slot.",
992  "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
993 
994  PG_TRY();
995  {
996  WalRcvExecResult *res;
997 
998  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
999 
1000  if (res->status != WALRCV_OK_COMMAND)
1001  ereport(ERROR,
1002  (errmsg("could not drop the replication slot \"%s\" on publisher",
1003  slotname),
1004  errdetail("The error was: %s", res->err)));
1005  else
1006  ereport(NOTICE,
1007  (errmsg("dropped replication slot \"%s\" on publisher",
1008  slotname)));
1009 
1010  walrcv_clear_result(res);
1011  }
1012  PG_FINALLY();
1013  {
1014  walrcv_disconnect(wrconn);
1015  }
1016  PG_END_TRY();
1017 
1018  pfree(cmd.data);
1019 
1020  table_close(rel, NoLock);
1021 }
WalReceiverConn * wrconn
Definition: worker.c:101
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:259
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1069
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10697
Oid GetUserId(void)
Definition: miscinit.c:380
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:331
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:160
uint16 RepOriginId
Definition: xlogdefs.h:58
int errcode(int sqlerrcode)
Definition: elog.c:608
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
FormData_pg_subscription * Form_pg_subscription
unsigned int Oid
Definition: postgres_ext.h:31
NameData subname
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:208
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:585
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3352
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:296
void pfree(void *pointer)
Definition: mcxt.c:1056
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define NoLock
Definition: lockdefs.h:34
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:452
int errdetail(const char *fmt,...)
Definition: elog.c:955
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3331
#define ereport(elevel, rest)
Definition: elog.h:141
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:907
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define PG_FINALLY()
Definition: elog.h:339
#define TextDatumGetCString(d)
Definition: builtins.h:84
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
#define NOTICE
Definition: elog.h:37
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:202
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define walrcv_disconnect(conn)
Definition: walreceiver.h:292
#define InvalidRepOriginId
Definition: origin.h:33
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:822
void list_free(List *list)
Definition: list.c:1377
#define NameStr(name)
Definition: c.h:616
#define PG_TRY()
Definition: elog.h:322
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
#define PG_END_TRY()
Definition: elog.h:347
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:290
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5317
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:264

◆ fetch_table_list()

static List * fetch_table_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 1126 of file subscriptioncmds.c.

References appendStringInfoChar(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), initStringInfo(), lappend(), lfirst, list_length(), makeRangeVar(), MakeSingleTupleTableSlot(), NIL, pfree(), pstrdup(), quote_literal_cstr(), relname, slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, and WALRCV_OK_TUPLES.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

1127 {
1128  WalRcvExecResult *res;
1129  StringInfoData cmd;
1130  TupleTableSlot *slot;
1131  Oid tableRow[2] = {TEXTOID, TEXTOID};
1132  ListCell *lc;
1133  bool first;
1134  List *tablelist = NIL;
1135 
1136  Assert(list_length(publications) > 0);
1137 
1138  initStringInfo(&cmd);
1139  appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1140  " FROM pg_catalog.pg_publication_tables t\n"
1141  " WHERE t.pubname IN (");
1142  first = true;
1143  foreach(lc, publications)
1144  {
1145  char *pubname = strVal(lfirst(lc));
1146 
1147  if (first)
1148  first = false;
1149  else
1150  appendStringInfoString(&cmd, ", ");
1151 
1153  }
1154  appendStringInfoChar(&cmd, ')');
1155 
1156  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1157  pfree(cmd.data);
1158 
1159  if (res->status != WALRCV_OK_TUPLES)
1160  ereport(ERROR,
1161  (errmsg("could not receive list of replicated tables from the publisher: %s",
1162  res->err)));
1163 
1164  /* Process tables. */
1166  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1167  {
1168  char *nspname;
1169  char *relname;
1170  bool isnull;
1171  RangeVar *rv;
1172 
1173  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1174  Assert(!isnull);
1175  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1176  Assert(!isnull);
1177 
1178  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1179  tablelist = lappend(tablelist, rv);
1180 
1181  ExecClearTuple(slot);
1182  }
1184 
1185  walrcv_clear_result(res);
1186 
1187  return tablelist;
1188 }
#define NIL
Definition: pg_list.h:65
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1208
char * pstrdup(const char *in)
Definition: mcxt.c:1186
#define strVal(v)
Definition: value.h:54
NameData relname
Definition: pg_class.h:35
unsigned int Oid
Definition: postgres_ext.h:31
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:296
void pfree(void *pointer)
Definition: mcxt.c:1056
TupleDesc tupledesc
Definition: walreceiver.h:205
#define ERROR
Definition: elog.h:43
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1224
#define ereport(elevel, rest)
Definition: elog.h:141
List * lappend(List *list, void *datum)
Definition: list.c:322
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define TextDatumGetCString(d)
Definition: builtins.h:84
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:381
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
Tuplestorestate * tuplestore
Definition: walreceiver.h:204
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:202
static int list_length(const List *l)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:822
Definition: pg_list.h:50
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:420
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:290

◆ parse_subscription_options()

static void parse_subscription_options ( List options,
bool connect,
bool enabled_given,
bool enabled,
bool create_slot,
bool slot_name_given,
char **  slot_name,
bool copy_data,
char **  synchronous_commit,
bool refresh 
)
static

Definition at line 57 of file subscriptioncmds.c.

References Assert, connect, defGetBoolean(), defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, GUC_ACTION_SET, lfirst, PGC_BACKEND, PGC_S_TEST, and set_config_option().

Referenced by AlterSubscription(), and CreateSubscription().

62 {
63  ListCell *lc;
64  bool connect_given = false;
65  bool create_slot_given = false;
66  bool copy_data_given = false;
67  bool refresh_given = false;
68 
69  /* If connect is specified, the others also need to be. */
70  Assert(!connect || (enabled && create_slot && copy_data));
71 
72  if (connect)
73  *connect = true;
74  if (enabled)
75  {
76  *enabled_given = false;
77  *enabled = true;
78  }
79  if (create_slot)
80  *create_slot = true;
81  if (slot_name)
82  {
83  *slot_name_given = false;
84  *slot_name = NULL;
85  }
86  if (copy_data)
87  *copy_data = true;
89  *synchronous_commit = NULL;
90  if (refresh)
91  *refresh = true;
92 
93  /* Parse options */
94  foreach(lc, options)
95  {
96  DefElem *defel = (DefElem *) lfirst(lc);
97 
98  if (strcmp(defel->defname, "connect") == 0 && connect)
99  {
100  if (connect_given)
101  ereport(ERROR,
102  (errcode(ERRCODE_SYNTAX_ERROR),
103  errmsg("conflicting or redundant options")));
104 
105  connect_given = true;
106  *connect = defGetBoolean(defel);
107  }
108  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
109  {
110  if (*enabled_given)
111  ereport(ERROR,
112  (errcode(ERRCODE_SYNTAX_ERROR),
113  errmsg("conflicting or redundant options")));
114 
115  *enabled_given = true;
116  *enabled = defGetBoolean(defel);
117  }
118  else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
119  {
120  if (create_slot_given)
121  ereport(ERROR,
122  (errcode(ERRCODE_SYNTAX_ERROR),
123  errmsg("conflicting or redundant options")));
124 
125  create_slot_given = true;
126  *create_slot = defGetBoolean(defel);
127  }
128  else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
129  {
130  if (*slot_name_given)
131  ereport(ERROR,
132  (errcode(ERRCODE_SYNTAX_ERROR),
133  errmsg("conflicting or redundant options")));
134 
135  *slot_name_given = true;
136  *slot_name = defGetString(defel);
137 
138  /* Setting slot_name = NONE is treated as no slot name. */
139  if (strcmp(*slot_name, "none") == 0)
140  *slot_name = NULL;
141  }
142  else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
143  {
144  if (copy_data_given)
145  ereport(ERROR,
146  (errcode(ERRCODE_SYNTAX_ERROR),
147  errmsg("conflicting or redundant options")));
148 
149  copy_data_given = true;
150  *copy_data = defGetBoolean(defel);
151  }
152  else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
154  {
155  if (*synchronous_commit)
156  ereport(ERROR,
157  (errcode(ERRCODE_SYNTAX_ERROR),
158  errmsg("conflicting or redundant options")));
159 
161 
162  /* Test if the given value is valid for synchronous_commit GUC. */
163  (void) set_config_option("synchronous_commit", *synchronous_commit,
165  false, 0, false);
166  }
167  else if (strcmp(defel->defname, "refresh") == 0 && refresh)
168  {
169  if (refresh_given)
170  ereport(ERROR,
171  (errcode(ERRCODE_SYNTAX_ERROR),
172  errmsg("conflicting or redundant options")));
173 
174  refresh_given = true;
175  *refresh = defGetBoolean(defel);
176  }
177  else
178  ereport(ERROR,
179  (errcode(ERRCODE_SYNTAX_ERROR),
180  errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
181  }
182 
183  /*
184  * We've been explicitly asked to not connect, that requires some
185  * additional processing.
186  */
187  if (connect && !*connect)
188  {
189  /* Check for incompatible options from the user. */
190  if (enabled && *enabled_given && *enabled)
191  ereport(ERROR,
192  (errcode(ERRCODE_SYNTAX_ERROR),
193  /*- translator: both %s are strings of the form "option = value" */
194  errmsg("%s and %s are mutually exclusive options",
195  "connect = false", "enabled = true")));
196 
197  if (create_slot && create_slot_given && *create_slot)
198  ereport(ERROR,
199  (errcode(ERRCODE_SYNTAX_ERROR),
200  errmsg("%s and %s are mutually exclusive options",
201  "connect = false", "create_slot = true")));
202 
203  if (copy_data && copy_data_given && *copy_data)
204  ereport(ERROR,
205  (errcode(ERRCODE_SYNTAX_ERROR),
206  errmsg("%s and %s are mutually exclusive options",
207  "connect = false", "copy_data = true")));
208 
209  /* Change the defaults of other options. */
210  *enabled = false;
211  *create_slot = false;
212  *copy_data = false;
213  }
214 
215  /*
216  * Do additional checking for disallowed combination when slot_name = NONE
217  * was used.
218  */
219  if (slot_name && *slot_name_given && !*slot_name)
220  {
221  if (enabled && *enabled_given && *enabled)
222  ereport(ERROR,
223  (errcode(ERRCODE_SYNTAX_ERROR),
224  /*- translator: both %s are strings of the form "option = value" */
225  errmsg("%s and %s are mutually exclusive options",
226  "slot_name = NONE", "enabled = true")));
227 
228  if (create_slot && create_slot_given && *create_slot)
229  ereport(ERROR,
230  (errcode(ERRCODE_SYNTAX_ERROR),
231  errmsg("%s and %s are mutually exclusive options",
232  "slot_name = NONE", "create_slot = true")));
233 
234  if (enabled && !*enabled_given && *enabled)
235  ereport(ERROR,
236  (errcode(ERRCODE_SYNTAX_ERROR),
237  /*- translator: both %s are strings of the form "option = value" */
238  errmsg("subscription with %s must also set %s",
239  "slot_name = NONE", "enabled = false")));
240 
241  if (create_slot && !create_slot_given && *create_slot)
242  ereport(ERROR,
243  (errcode(ERRCODE_SYNTAX_ERROR),
244  errmsg("subscription with %s must also set %s",
245  "slot_name = NONE", "create_slot = false")));
246  }
247 }
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:608
#define connect(s, name, namelen)
Definition: win32_port.h:435
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:141
#define Assert(condition)
Definition: c.h:739
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:822
char * defname
Definition: parsenodes.h:730
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:6798

◆ publicationListToArray()

static Datum publicationListToArray ( List publist)
static

Definition at line 253 of file subscriptioncmds.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, construct_array(), CStringGetTextDatum, CurrentMemoryContext, ereport, errcode(), errmsg(), ERROR, lfirst, list_length(), MemoryContextDelete(), MemoryContextSwitchTo(), name, palloc(), PointerGetDatum, and strVal.

Referenced by AlterSubscription(), and CreateSubscription().

254 {
255  ArrayType *arr;
256  Datum *datums;
257  int j = 0;
258  ListCell *cell;
259  MemoryContext memcxt;
260  MemoryContext oldcxt;
261 
262  /* Create memory context for temporary allocations. */
264  "publicationListToArray to array",
266  oldcxt = MemoryContextSwitchTo(memcxt);
267 
268  datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
269 
270  foreach(cell, publist)
271  {
272  char *name = strVal(lfirst(cell));
273  ListCell *pcell;
274 
275  /* Check for duplicates. */
276  foreach(pcell, publist)
277  {
278  char *pname = strVal(lfirst(pcell));
279 
280  if (pcell == cell)
281  break;
282 
283  if (strcmp(name, pname) == 0)
284  ereport(ERROR,
285  (errcode(ERRCODE_SYNTAX_ERROR),
286  errmsg("publication name \"%s\" used more than once",
287  pname)));
288  }
289 
290  datums[j++] = CStringGetTextDatum(name);
291  }
292 
293  MemoryContextSwitchTo(oldcxt);
294 
295  arr = construct_array(datums, list_length(publist),
296  TEXTOID, -1, false, 'i');
297 
298  MemoryContextDelete(memcxt);
299 
300  return PointerGetDatum(arr);
301 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define PointerGetDatum(X)
Definition: postgres.h:556
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3291
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:608
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
uintptr_t Datum
Definition: postgres.h:367
#define lfirst(lc)
Definition: pg_list.h:190
static int list_length(const List *l)
Definition: pg_list.h:169
const char * name
Definition: encode.c:521
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define CStringGetTextDatum(s)
Definition: builtins.h:83