PostgreSQL Source Code  git master
subscriptioncmds.c File Reference
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Functions

static Listfetch_table_list (WalReceiverConn *wrconn, List *publications)
 
static void parse_subscription_options (List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 635 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), MyDatabaseId, namein(), OBJECT_SUBSCRIPTION, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, synchronous_commit, HeapTupleData::t_self, table_close(), table_open(), values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

636 {
637  Relation rel;
638  ObjectAddress myself;
639  bool nulls[Natts_pg_subscription];
640  bool replaces[Natts_pg_subscription];
641  Datum values[Natts_pg_subscription];
642  HeapTuple tup;
643  Oid subid;
644  bool update_tuple = false;
645  Subscription *sub;
647 
648  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
649 
650  /* Fetch the existing tuple. */
652  CStringGetDatum(stmt->subname));
653 
654  if (!HeapTupleIsValid(tup))
655  ereport(ERROR,
656  (errcode(ERRCODE_UNDEFINED_OBJECT),
657  errmsg("subscription \"%s\" does not exist",
658  stmt->subname)));
659 
660  form = (Form_pg_subscription) GETSTRUCT(tup);
661  subid = form->oid;
662 
663  /* must be owner */
664  if (!pg_subscription_ownercheck(subid, GetUserId()))
666  stmt->subname);
667 
668  sub = GetSubscription(subid, false);
669 
670  /* Lock the subscription so nobody else can do anything with it. */
671  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
672 
673  /* Form a new tuple. */
674  memset(values, 0, sizeof(values));
675  memset(nulls, false, sizeof(nulls));
676  memset(replaces, false, sizeof(replaces));
677 
678  switch (stmt->kind)
679  {
681  {
682  char *slotname;
683  bool slotname_given;
684  char *synchronous_commit;
685 
686  parse_subscription_options(stmt->options, NULL, NULL, NULL,
687  NULL, &slotname_given, &slotname,
688  NULL, &synchronous_commit, NULL);
689 
690  if (slotname_given)
691  {
692  if (sub->enabled && !slotname)
693  ereport(ERROR,
694  (errcode(ERRCODE_SYNTAX_ERROR),
695  errmsg("cannot set %s for enabled subscription",
696  "slot_name = NONE")));
697 
698  if (slotname)
699  values[Anum_pg_subscription_subslotname - 1] =
701  else
702  nulls[Anum_pg_subscription_subslotname - 1] = true;
703  replaces[Anum_pg_subscription_subslotname - 1] = true;
704  }
705 
706  if (synchronous_commit)
707  {
708  values[Anum_pg_subscription_subsynccommit - 1] =
709  CStringGetTextDatum(synchronous_commit);
710  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
711  }
712 
713  update_tuple = true;
714  break;
715  }
716 
718  {
719  bool enabled,
720  enabled_given;
721 
723  &enabled_given, &enabled, NULL,
724  NULL, NULL, NULL, NULL, NULL);
725  Assert(enabled_given);
726 
727  if (!sub->slotname && enabled)
728  ereport(ERROR,
729  (errcode(ERRCODE_SYNTAX_ERROR),
730  errmsg("cannot enable subscription that does not have a slot name")));
731 
732  values[Anum_pg_subscription_subenabled - 1] =
733  BoolGetDatum(enabled);
734  replaces[Anum_pg_subscription_subenabled - 1] = true;
735 
736  if (enabled)
738 
739  update_tuple = true;
740  break;
741  }
742 
744  /* Load the library providing us libpq calls. */
745  load_file("libpqwalreceiver", false);
746  /* Check the connection info string. */
748 
749  values[Anum_pg_subscription_subconninfo - 1] =
751  replaces[Anum_pg_subscription_subconninfo - 1] = true;
752  update_tuple = true;
753  break;
754 
756  {
757  bool copy_data;
758  bool refresh;
759 
760  parse_subscription_options(stmt->options, NULL, NULL, NULL,
761  NULL, NULL, NULL, &copy_data,
762  NULL, &refresh);
763 
764  values[Anum_pg_subscription_subpublications - 1] =
766  replaces[Anum_pg_subscription_subpublications - 1] = true;
767 
768  update_tuple = true;
769 
770  /* Refresh if user asked us to. */
771  if (refresh)
772  {
773  if (!sub->enabled)
774  ereport(ERROR,
775  (errcode(ERRCODE_SYNTAX_ERROR),
776  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
777  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
778 
779  /* Make sure refresh sees the new list of publications. */
780  sub->publications = stmt->publication;
781 
782  AlterSubscription_refresh(sub, copy_data);
783  }
784 
785  break;
786  }
787 
789  {
790  bool copy_data;
791 
792  if (!sub->enabled)
793  ereport(ERROR,
794  (errcode(ERRCODE_SYNTAX_ERROR),
795  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
796 
797  parse_subscription_options(stmt->options, NULL, NULL, NULL,
798  NULL, NULL, NULL, &copy_data,
799  NULL, NULL);
800 
801  AlterSubscription_refresh(sub, copy_data);
802 
803  break;
804  }
805 
806  default:
807  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
808  stmt->kind);
809  }
810 
811  /* Update the catalog if needed. */
812  if (update_tuple)
813  {
814  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
815  replaces);
816 
817  CatalogTupleUpdate(rel, &tup->t_self, tup);
818 
819  heap_freetuple(tup);
820  }
821 
823 
824  ObjectAddressSet(myself, SubscriptionRelationId, subid);
825 
826  InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
827 
828  return myself;
829 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:974
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:442
Oid GetUserId(void)
Definition: miscinit.c:380
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:257
int errcode(int sqlerrcode)
Definition: elog.c:570
static Datum publicationListToArray(List *publist)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:616
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3353
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
AlterSubscriptionType kind
Definition: parsenodes.h:3508
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
List * publications
int synchronous_commit
Definition: xact.c:83
#define ereport(elevel, rest)
Definition: elog.h:141
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1004
#define BoolGetDatum(X)
Definition: postgres.h:402
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:732
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:955
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5319

◆ AlterSubscription_refresh()

static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data 
)
static

Definition at line 526 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), CheckSubscriptionRelkind(), Subscription::conninfo, DEBUG1, ereport, errmsg(), ERROR, fetch_table_list(), get_namespace_name(), get_rel_name(), get_rel_namespace(), get_rel_relkind(), GetSubscriptionRelations(), InvalidXLogRecPtr, lfirst, list_length(), load_file(), logicalrep_worker_stop_at_commit(), Subscription::name, Subscription::oid, oid_cmp(), palloc(), Subscription::publications, qsort, RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), RangeVar::schemaname, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

527 {
528  char *err;
529  List *pubrel_names;
530  List *subrel_states;
531  Oid *subrel_local_oids;
532  Oid *pubrel_local_oids;
533  ListCell *lc;
534  int off;
535 
536  /* Load the library providing us libpq calls. */
537  load_file("libpqwalreceiver", false);
538 
539  /* Try to connect to the publisher. */
540  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
541  if (!wrconn)
542  ereport(ERROR,
543  (errmsg("could not connect to the publisher: %s", err)));
544 
545  /* Get the table list from publisher. */
546  pubrel_names = fetch_table_list(wrconn, sub->publications);
547 
548  /* We are done with the remote side, close connection. */
550 
551  /* Get local table list. */
552  subrel_states = GetSubscriptionRelations(sub->oid);
553 
554  /*
555  * Build qsorted array of local table oids for faster lookup. This can
556  * potentially contain all tables in the database so speed of lookup is
557  * important.
558  */
559  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
560  off = 0;
561  foreach(lc, subrel_states)
562  {
564 
565  subrel_local_oids[off++] = relstate->relid;
566  }
567  qsort(subrel_local_oids, list_length(subrel_states),
568  sizeof(Oid), oid_cmp);
569 
570  /*
571  * Walk over the remote tables and try to match them to locally known
572  * tables. If the table is not known locally create a new state for it.
573  *
574  * Also builds array of local oids of remote tables for the next step.
575  */
576  off = 0;
577  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
578 
579  foreach(lc, pubrel_names)
580  {
581  RangeVar *rv = (RangeVar *) lfirst(lc);
582  Oid relid;
583 
584  relid = RangeVarGetRelid(rv, AccessShareLock, false);
585 
586  /* Check for supported relkind. */
588  rv->schemaname, rv->relname);
589 
590  pubrel_local_oids[off++] = relid;
591 
592  if (!bsearch(&relid, subrel_local_oids,
593  list_length(subrel_states), sizeof(Oid), oid_cmp))
594  {
595  AddSubscriptionRelState(sub->oid, relid,
596  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
598  ereport(DEBUG1,
599  (errmsg("table \"%s.%s\" added to subscription \"%s\"",
600  rv->schemaname, rv->relname, sub->name)));
601  }
602  }
603 
604  /*
605  * Next remove state for tables we should not care about anymore using the
606  * data we collected above
607  */
608  qsort(pubrel_local_oids, list_length(pubrel_names),
609  sizeof(Oid), oid_cmp);
610 
611  for (off = 0; off < list_length(subrel_states); off++)
612  {
613  Oid relid = subrel_local_oids[off];
614 
615  if (!bsearch(&relid, pubrel_local_oids,
616  list_length(pubrel_names), sizeof(Oid), oid_cmp))
617  {
618  RemoveSubscriptionRel(sub->oid, relid);
619 
621 
622  ereport(DEBUG1,
623  (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
625  get_rel_name(relid),
626  sub->name)));
627  }
628  }
629 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:100
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
void RemoveSubscriptionRel(Oid subid, Oid relid)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:63
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1805
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1754
#define AccessShareLock
Definition: lockdefs.h:36
unsigned int Oid
Definition: postgres_ext.h:31
char * schemaname
Definition: primnodes.h:67
char * relname
Definition: primnodes.h:68
#define ERROR
Definition: elog.h:43
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3094
List * GetSubscriptionRelations(Oid subid)
List * publications
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
#define ereport(elevel, rest)
Definition: elog.h:141
#define lfirst(lc)
Definition: pg_list.h:190
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
static int list_length(const List *l)
Definition: pg_list.h:169
#define walrcv_disconnect(conn)
Definition: walreceiver.h:281
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
Definition: launcher.c:561
#define qsort(a, b, c, d)
Definition: port.h:488
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1730
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1082 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

1083 {
1084  Oid subid;
1085  HeapTuple tup;
1086  Relation rel;
1087  ObjectAddress address;
1088  Form_pg_subscription form;
1089 
1090  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1091 
1094 
1095  if (!HeapTupleIsValid(tup))
1096  ereport(ERROR,
1097  (errcode(ERRCODE_UNDEFINED_OBJECT),
1098  errmsg("subscription \"%s\" does not exist", name)));
1099 
1100  form = (Form_pg_subscription) GETSTRUCT(tup);
1101  subid = form->oid;
1102 
1103  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1104 
1105  ObjectAddressSet(address, SubscriptionRelationId, subid);
1106 
1107  heap_freetuple(tup);
1108 
1110 
1111  return address;
1112 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:570
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
#define ereport(elevel, rest)
Definition: elog.h:141
Oid MyDatabaseId
Definition: globals.c:85
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterSubscriptionOwner_internal()

static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 1045 of file subscriptioncmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, CatalogTupleUpdate(), changeDependencyOnOwner(), ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, NameStr, OBJECT_SUBSCRIPTION, pg_subscription_ownercheck(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

1046 {
1047  Form_pg_subscription form;
1048 
1049  form = (Form_pg_subscription) GETSTRUCT(tup);
1050 
1051  if (form->subowner == newOwnerId)
1052  return;
1053 
1054  if (!pg_subscription_ownercheck(form->oid, GetUserId()))
1056  NameStr(form->subname));
1057 
1058  /* New owner must be a superuser */
1059  if (!superuser_arg(newOwnerId))
1060  ereport(ERROR,
1061  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1062  errmsg("permission denied to change owner of subscription \"%s\"",
1063  NameStr(form->subname)),
1064  errhint("The owner of a subscription must be a superuser.")));
1065 
1066  form->subowner = newOwnerId;
1067  CatalogTupleUpdate(rel, &tup->t_self, tup);
1068 
1069  /* Update owner dependency reference */
1070  changeDependencyOnOwner(SubscriptionRelationId,
1071  form->oid,
1072  newOwnerId);
1073 
1074  InvokeObjectPostAlterHook(SubscriptionRelationId,
1075  form->oid, 0);
1076 }
int errhint(const char *fmt,...)
Definition: elog.c:974
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:380
int errcode(int sqlerrcode)
Definition: elog.c:570
FormData_pg_subscription * Form_pg_subscription
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:310
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3353
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define ereport(elevel, rest)
Definition: elog.h:141
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
bool superuser_arg(Oid roleid)
Definition: superuser.c:57
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define NameStr(name)
Definition: c.h:609
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5319

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1118 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, table_close(), and table_open().

Referenced by shdepReassignOwned().

1119 {
1120  HeapTuple tup;
1121  Relation rel;
1122 
1123  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1124 
1126 
1127  if (!HeapTupleIsValid(tup))
1128  ereport(ERROR,
1129  (errcode(ERRCODE_UNDEFINED_OBJECT),
1130  errmsg("subscription with OID %u does not exist", subid)));
1131 
1132  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1133 
1134  heap_freetuple(tup);
1135 
1137 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errcode(int sqlerrcode)
Definition: elog.c:570
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:141
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:784
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ CreateSubscription()

ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 315 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, create_slot, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), heap_form_tuple(), heap_freetuple(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PreventInTransactionBlock(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, snprintf, CreateSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionObjectIndexId, superuser(), synchronous_commit, table_close(), table_open(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

316 {
317  Relation rel;
318  ObjectAddress myself;
319  Oid subid;
320  bool nulls[Natts_pg_subscription];
321  Datum values[Natts_pg_subscription];
322  Oid owner = GetUserId();
323  HeapTuple tup;
324  bool connect;
325  bool enabled_given;
326  bool enabled;
327  bool copy_data;
328  char *synchronous_commit;
329  char *conninfo;
330  char *slotname;
331  bool slotname_given;
332  char originname[NAMEDATALEN];
333  bool create_slot;
334  List *publications;
335 
336  /*
337  * Parse and check options.
338  *
339  * Connection and publication should not be specified here.
340  */
341  parse_subscription_options(stmt->options, &connect, &enabled_given,
342  &enabled, &create_slot, &slotname_given,
343  &slotname, &copy_data, &synchronous_commit,
344  NULL);
345 
346  /*
347  * Since creating a replication slot is not transactional, rolling back
348  * the transaction leaves the created replication slot. So we cannot run
349  * CREATE SUBSCRIPTION inside a transaction block if creating a
350  * replication slot.
351  */
352  if (create_slot)
353  PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
354 
355  if (!superuser())
356  ereport(ERROR,
357  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
358  (errmsg("must be superuser to create subscriptions"))));
359 
360  /*
361  * If built with appropriate switch, whine when regression-testing
362  * conventions for subscription names are violated.
363  */
364 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
365  if (strncmp(stmt->subname, "regress_", 8) != 0)
366  elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
367 #endif
368 
369  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
370 
371  /* Check if name is used */
372  subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
374  if (OidIsValid(subid))
375  {
376  ereport(ERROR,
378  errmsg("subscription \"%s\" already exists",
379  stmt->subname)));
380  }
381 
382  if (!slotname_given && slotname == NULL)
383  slotname = stmt->subname;
384 
385  /* The default for synchronous_commit of subscriptions is off. */
386  if (synchronous_commit == NULL)
387  synchronous_commit = "off";
388 
389  conninfo = stmt->conninfo;
390  publications = stmt->publication;
391 
392  /* Load the library providing us libpq calls. */
393  load_file("libpqwalreceiver", false);
394 
395  /* Check the connection info string. */
396  walrcv_check_conninfo(conninfo);
397 
398  /* Everything ok, form a new tuple. */
399  memset(values, 0, sizeof(values));
400  memset(nulls, false, sizeof(nulls));
401 
403  Anum_pg_subscription_oid);
404  values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
405  values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
406  values[Anum_pg_subscription_subname - 1] =
408  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
409  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
410  values[Anum_pg_subscription_subconninfo - 1] =
411  CStringGetTextDatum(conninfo);
412  if (slotname)
413  values[Anum_pg_subscription_subslotname - 1] =
415  else
416  nulls[Anum_pg_subscription_subslotname - 1] = true;
417  values[Anum_pg_subscription_subsynccommit - 1] =
418  CStringGetTextDatum(synchronous_commit);
419  values[Anum_pg_subscription_subpublications - 1] =
420  publicationListToArray(publications);
421 
422  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
423 
424  /* Insert tuple into catalog. */
425  CatalogTupleInsert(rel, tup);
426  heap_freetuple(tup);
427 
428  recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
429 
430  snprintf(originname, sizeof(originname), "pg_%u", subid);
431  replorigin_create(originname);
432 
433  /*
434  * Connect to remote side to execute requested commands and fetch table
435  * info.
436  */
437  if (connect)
438  {
439  XLogRecPtr lsn;
440  char *err;
442  List *tables;
443  ListCell *lc;
444  char table_state;
445 
446  /* Try to connect to the publisher. */
447  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
448  if (!wrconn)
449  ereport(ERROR,
450  (errmsg("could not connect to the publisher: %s", err)));
451 
452  PG_TRY();
453  {
454  /*
455  * Set sync state based on if we were asked to do data copy or
456  * not.
457  */
458  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
459 
460  /*
461  * Get the table list from publisher and build local table status
462  * info.
463  */
464  tables = fetch_table_list(wrconn, publications);
465  foreach(lc, tables)
466  {
467  RangeVar *rv = (RangeVar *) lfirst(lc);
468  Oid relid;
469 
470  relid = RangeVarGetRelid(rv, AccessShareLock, false);
471 
472  /* Check for supported relkind. */
474  rv->schemaname, rv->relname);
475 
476  AddSubscriptionRelState(subid, relid, table_state,
478  }
479 
480  /*
481  * If requested, create permanent slot for the subscription. We
482  * won't use the initial snapshot for anything, so no need to
483  * export it.
484  */
485  if (create_slot)
486  {
487  Assert(slotname);
488 
489  walrcv_create_slot(wrconn, slotname, false,
490  CRS_NOEXPORT_SNAPSHOT, &lsn);
491  ereport(NOTICE,
492  (errmsg("created replication slot \"%s\" on publisher",
493  slotname)));
494  }
495  }
496  PG_CATCH();
497  {
498  /* Close the connection in case of failure. */
499  walrcv_disconnect(wrconn);
500  PG_RE_THROW();
501  }
502  PG_END_TRY();
503 
504  /* And we are done with the remote side. */
505  walrcv_disconnect(wrconn);
506  }
507  else
509  /* translator: %s is an SQL ALTER statement */
510  (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
511  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
512 
514 
515  if (enabled)
517 
518  ObjectAddressSet(myself, SubscriptionRelationId, subid);
519 
520  InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
521 
522  return myself;
523 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:323
WalReceiverConn * wrconn
Definition: worker.c:100
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
#define RelationGetDescr(relation)
Definition: rel.h:442
Oid GetUserId(void)
Definition: miscinit.c:380
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:63
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:257
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1805
#define AccessShareLock
Definition: lockdefs.h:36
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:570
bool superuser(void)
Definition: superuser.c:47
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define connect(s, name, namelen)
Definition: win32_port.h:444
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:616
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:165
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:277
#define OidIsValid(objectId)
Definition: c.h:638
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3328
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:243
int synchronous_commit
Definition: xact.c:83
#define ereport(elevel, rest)
Definition: elog.h:141
#define WARNING
Definition: elog.h:40
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
#define NOTICE
Definition: elog.h:37
#define PG_CATCH()
Definition: elog.h:310
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define PG_RE_THROW()
Definition: elog.h:331
#define walrcv_disconnect(conn)
Definition: walreceiver.h:281
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define elog(elevel,...)
Definition: elog.h:226
#define CStringGetTextDatum(s)
Definition: builtins.h:83
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:955
#define PG_TRY()
Definition: elog.h:301
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:33
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
#define PG_END_TRY()
Definition: elog.h:317
#define SubscriptionObjectIndexId
Definition: indexing.h:361
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 835 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GETSTRUCT, GetUserId(), HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, OBJECT_SUBSCRIPTION, ObjectAddressSet, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, pg_subscription_ownercheck(), PG_TRY, PreventInTransactionBlock(), pstrdup(), quote_identifier(), ReleaseSysCache(), LogicalRepWorker::relid, RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2(), snprintf, WalRcvExecResult::status, LogicalRepWorker::subid, subname, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SysCacheGetAttr(), HeapTupleData::t_self, table_close(), table_open(), TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

836 {
837  Relation rel;
838  ObjectAddress myself;
839  HeapTuple tup;
840  Oid subid;
841  Datum datum;
842  bool isnull;
843  char *subname;
844  char *conninfo;
845  char *slotname;
846  List *subworkers;
847  ListCell *lc;
848  char originname[NAMEDATALEN];
849  char *err = NULL;
850  RepOriginId originid;
851  WalReceiverConn *wrconn = NULL;
852  StringInfoData cmd;
854 
855  /*
856  * Lock pg_subscription with AccessExclusiveLock to ensure that the
857  * launcher doesn't restart new worker during dropping the subscription
858  */
859  rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
860 
862  CStringGetDatum(stmt->subname));
863 
864  if (!HeapTupleIsValid(tup))
865  {
866  table_close(rel, NoLock);
867 
868  if (!stmt->missing_ok)
869  ereport(ERROR,
870  (errcode(ERRCODE_UNDEFINED_OBJECT),
871  errmsg("subscription \"%s\" does not exist",
872  stmt->subname)));
873  else
874  ereport(NOTICE,
875  (errmsg("subscription \"%s\" does not exist, skipping",
876  stmt->subname)));
877 
878  return;
879  }
880 
881  form = (Form_pg_subscription) GETSTRUCT(tup);
882  subid = form->oid;
883 
884  /* must be owner */
885  if (!pg_subscription_ownercheck(subid, GetUserId()))
887  stmt->subname);
888 
889  /* DROP hook for the subscription being removed */
890  InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
891 
892  /*
893  * Lock the subscription so nobody else can do anything with it (including
894  * the replication workers).
895  */
896  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
897 
898  /* Get subname */
899  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
900  Anum_pg_subscription_subname, &isnull);
901  Assert(!isnull);
902  subname = pstrdup(NameStr(*DatumGetName(datum)));
903 
904  /* Get conninfo */
905  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
906  Anum_pg_subscription_subconninfo, &isnull);
907  Assert(!isnull);
908  conninfo = TextDatumGetCString(datum);
909 
910  /* Get slotname */
911  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
912  Anum_pg_subscription_subslotname, &isnull);
913  if (!isnull)
914  slotname = pstrdup(NameStr(*DatumGetName(datum)));
915  else
916  slotname = NULL;
917 
918  /*
919  * Since dropping a replication slot is not transactional, the replication
920  * slot stays dropped even if the transaction rolls back. So we cannot
921  * run DROP SUBSCRIPTION inside a transaction block if dropping the
922  * replication slot.
923  *
924  * XXX The command name should really be something like "DROP SUBSCRIPTION
925  * of a subscription that is associated with a replication slot", but we
926  * don't have the proper facilities for that.
927  */
928  if (slotname)
929  PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
930 
931 
932  ObjectAddressSet(myself, SubscriptionRelationId, subid);
933  EventTriggerSQLDropAddObject(&myself, true, true);
934 
935  /* Remove the tuple from catalog. */
936  CatalogTupleDelete(rel, &tup->t_self);
937 
938  ReleaseSysCache(tup);
939 
940  /*
941  * Stop all the subscription workers immediately.
942  *
943  * This is necessary if we are dropping the replication slot, so that the
944  * slot becomes accessible.
945  *
946  * It is also necessary if the subscription is disabled and was disabled
947  * in the same transaction. Then the workers haven't seen the disabling
948  * yet and will still be running, leading to hangs later when we want to
949  * drop the replication origin. If the subscription was disabled before
950  * this transaction, then there shouldn't be any workers left, so this
951  * won't make a difference.
952  *
953  * New workers won't be started because we hold an exclusive lock on the
954  * subscription till the end of the transaction.
955  */
956  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
957  subworkers = logicalrep_workers_find(subid, false);
958  LWLockRelease(LogicalRepWorkerLock);
959  foreach(lc, subworkers)
960  {
962 
964  }
965  list_free(subworkers);
966 
967  /* Clean up dependencies */
968  deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
969 
970  /* Remove any associated relation synchronization states. */
972 
973  /* Remove the origin tracking if exists. */
974  snprintf(originname, sizeof(originname), "pg_%u", subid);
975  originid = replorigin_by_name(originname, true);
976  if (originid != InvalidRepOriginId)
977  replorigin_drop(originid, false);
978 
979  /*
980  * If there is no slot associated with the subscription, we can finish
981  * here.
982  */
983  if (!slotname)
984  {
985  table_close(rel, NoLock);
986  return;
987  }
988 
989  /*
990  * Otherwise drop the replication slot at the publisher node using the
991  * replication connection.
992  */
993  load_file("libpqwalreceiver", false);
994 
995  initStringInfo(&cmd);
996  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
997 
998  wrconn = walrcv_connect(conninfo, true, subname, &err);
999  if (wrconn == NULL)
1000  ereport(ERROR,
1001  (errmsg("could not connect to publisher when attempting to "
1002  "drop the replication slot \"%s\"", slotname),
1003  errdetail("The error was: %s", err),
1004  /* translator: %s is an SQL ALTER command */
1005  errhint("Use %s to disassociate the subscription from the slot.",
1006  "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
1007 
1008  PG_TRY();
1009  {
1010  WalRcvExecResult *res;
1011 
1012  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1013 
1014  if (res->status != WALRCV_OK_COMMAND)
1015  ereport(ERROR,
1016  (errmsg("could not drop the replication slot \"%s\" on publisher",
1017  slotname),
1018  errdetail("The error was: %s", res->err)));
1019  else
1020  ereport(NOTICE,
1021  (errmsg("dropped replication slot \"%s\" on publisher",
1022  slotname)));
1023 
1024  walrcv_clear_result(res);
1025  }
1026  PG_CATCH();
1027  {
1028  /* Close the connection in case of failure */
1029  walrcv_disconnect(wrconn);
1030  PG_RE_THROW();
1031  }
1032  PG_END_TRY();
1033 
1034  walrcv_disconnect(wrconn);
1035 
1036  pfree(cmd.data);
1037 
1038  table_close(rel, NoLock);
1039 }
WalReceiverConn * wrconn
Definition: worker.c:100
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:271
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:974
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10628
Oid GetUserId(void)
Definition: miscinit.c:380
char * pstrdup(const char *in)
Definition: mcxt.c:1161
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:335
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:154
uint16 RepOriginId
Definition: xlogdefs.h:58
int errcode(int sqlerrcode)
Definition: elog.c:570
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
FormData_pg_subscription * Form_pg_subscription
unsigned int Oid
Definition: postgres_ext.h:31
NameData subname
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:212
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1726
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:585
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3353
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:285
void pfree(void *pointer)
Definition: mcxt.c:1031
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define NoLock
Definition: lockdefs.h:34
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:464
int errdetail(const char *fmt,...)
Definition: elog.c:860
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3328
#define ereport(elevel, rest)
Definition: elog.h:141
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:908
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define TextDatumGetCString(d)
Definition: builtins.h:84
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1172
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1385
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1004
#define InvalidOid
Definition: postgres_ext.h:36
#define NOTICE
Definition: elog.h:37
#define PG_CATCH()
Definition: elog.h:310
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:195
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1135
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1122
#define PG_RE_THROW()
Definition: elog.h:331
#define walrcv_disconnect(conn)
Definition: walreceiver.h:281
#define InvalidRepOriginId
Definition: origin.h:33
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:784
void list_free(List *list)
Definition: list.c:1373
#define NameStr(name)
Definition: c.h:609
#define PG_TRY()
Definition: elog.h:301
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define snprintf
Definition: port.h:192
#define PG_END_TRY()
Definition: elog.h:317
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:279
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5319
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:255

◆ fetch_table_list()

static List * fetch_table_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 1144 of file subscriptioncmds.c.

References appendStringInfoChar(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), initStringInfo(), lappend(), lfirst, list_length(), makeRangeVar(), MakeSingleTupleTableSlot(), NIL, pfree(), pstrdup(), quote_literal_cstr(), relname, slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, and WALRCV_OK_TUPLES.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

1145 {
1146  WalRcvExecResult *res;
1147  StringInfoData cmd;
1148  TupleTableSlot *slot;
1149  Oid tableRow[2] = {TEXTOID, TEXTOID};
1150  ListCell *lc;
1151  bool first;
1152  List *tablelist = NIL;
1153 
1154  Assert(list_length(publications) > 0);
1155 
1156  initStringInfo(&cmd);
1157  appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1158  " FROM pg_catalog.pg_publication_tables t\n"
1159  " WHERE t.pubname IN (");
1160  first = true;
1161  foreach(lc, publications)
1162  {
1163  char *pubname = strVal(lfirst(lc));
1164 
1165  if (first)
1166  first = false;
1167  else
1168  appendStringInfoString(&cmd, ", ");
1169 
1171  }
1172  appendStringInfoChar(&cmd, ')');
1173 
1174  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1175  pfree(cmd.data);
1176 
1177  if (res->status != WALRCV_OK_TUPLES)
1178  ereport(ERROR,
1179  (errmsg("could not receive list of replicated tables from the publisher: %s",
1180  res->err)));
1181 
1182  /* Process tables. */
1184  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1185  {
1186  char *nspname;
1187  char *relname;
1188  bool isnull;
1189  RangeVar *rv;
1190 
1191  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1192  Assert(!isnull);
1193  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1194  Assert(!isnull);
1195 
1196  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1197  tablelist = lappend(tablelist, rv);
1198 
1199  ExecClearTuple(slot);
1200  }
1202 
1203  walrcv_clear_result(res);
1204 
1205  return tablelist;
1206 }
#define NIL
Definition: pg_list.h:65
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:426
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1203
char * pstrdup(const char *in)
Definition: mcxt.c:1161
#define strVal(v)
Definition: value.h:54
NameData relname
Definition: pg_class.h:35
unsigned int Oid
Definition: postgres_ext.h:31
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:285
void pfree(void *pointer)
Definition: mcxt.c:1031
TupleDesc tupledesc
Definition: walreceiver.h:198
#define ERROR
Definition: elog.h:43
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:163
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1219
#define ereport(elevel, rest)
Definition: elog.h:141
List * lappend(List *list, void *datum)
Definition: list.c:321
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:175
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define TextDatumGetCString(d)
Definition: builtins.h:84
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:382
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
Tuplestorestate * tuplestore
Definition: walreceiver.h:197
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:195
static int list_length(const List *l)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:784
Definition: pg_list.h:50
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:420
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:279

◆ parse_subscription_options()

static void parse_subscription_options ( List options,
bool connect,
bool enabled_given,
bool enabled,
bool create_slot,
bool slot_name_given,
char **  slot_name,
bool copy_data,
char **  synchronous_commit,
bool refresh 
)
static

Definition at line 65 of file subscriptioncmds.c.

References Assert, connect, defGetBoolean(), defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, GUC_ACTION_SET, lfirst, PGC_BACKEND, PGC_S_TEST, and set_config_option().

Referenced by AlterSubscription(), and CreateSubscription().

70 {
71  ListCell *lc;
72  bool connect_given = false;
73  bool create_slot_given = false;
74  bool copy_data_given = false;
75  bool refresh_given = false;
76 
77  /* If connect is specified, the others also need to be. */
78  Assert(!connect || (enabled && create_slot && copy_data));
79 
80  if (connect)
81  *connect = true;
82  if (enabled)
83  {
84  *enabled_given = false;
85  *enabled = true;
86  }
87  if (create_slot)
88  *create_slot = true;
89  if (slot_name)
90  {
91  *slot_name_given = false;
92  *slot_name = NULL;
93  }
94  if (copy_data)
95  *copy_data = true;
97  *synchronous_commit = NULL;
98  if (refresh)
99  *refresh = true;
100 
101  /* Parse options */
102  foreach(lc, options)
103  {
104  DefElem *defel = (DefElem *) lfirst(lc);
105 
106  if (strcmp(defel->defname, "connect") == 0 && connect)
107  {
108  if (connect_given)
109  ereport(ERROR,
110  (errcode(ERRCODE_SYNTAX_ERROR),
111  errmsg("conflicting or redundant options")));
112 
113  connect_given = true;
114  *connect = defGetBoolean(defel);
115  }
116  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
117  {
118  if (*enabled_given)
119  ereport(ERROR,
120  (errcode(ERRCODE_SYNTAX_ERROR),
121  errmsg("conflicting or redundant options")));
122 
123  *enabled_given = true;
124  *enabled = defGetBoolean(defel);
125  }
126  else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
127  {
128  if (create_slot_given)
129  ereport(ERROR,
130  (errcode(ERRCODE_SYNTAX_ERROR),
131  errmsg("conflicting or redundant options")));
132 
133  create_slot_given = true;
134  *create_slot = defGetBoolean(defel);
135  }
136  else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
137  {
138  if (*slot_name_given)
139  ereport(ERROR,
140  (errcode(ERRCODE_SYNTAX_ERROR),
141  errmsg("conflicting or redundant options")));
142 
143  *slot_name_given = true;
144  *slot_name = defGetString(defel);
145 
146  /* Setting slot_name = NONE is treated as no slot name. */
147  if (strcmp(*slot_name, "none") == 0)
148  *slot_name = NULL;
149  }
150  else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
151  {
152  if (copy_data_given)
153  ereport(ERROR,
154  (errcode(ERRCODE_SYNTAX_ERROR),
155  errmsg("conflicting or redundant options")));
156 
157  copy_data_given = true;
158  *copy_data = defGetBoolean(defel);
159  }
160  else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
162  {
163  if (*synchronous_commit)
164  ereport(ERROR,
165  (errcode(ERRCODE_SYNTAX_ERROR),
166  errmsg("conflicting or redundant options")));
167 
169 
170  /* Test if the given value is valid for synchronous_commit GUC. */
171  (void) set_config_option("synchronous_commit", *synchronous_commit,
173  false, 0, false);
174  }
175  else if (strcmp(defel->defname, "refresh") == 0 && refresh)
176  {
177  if (refresh_given)
178  ereport(ERROR,
179  (errcode(ERRCODE_SYNTAX_ERROR),
180  errmsg("conflicting or redundant options")));
181 
182  refresh_given = true;
183  *refresh = defGetBoolean(defel);
184  }
185  else
186  ereport(ERROR,
187  (errcode(ERRCODE_SYNTAX_ERROR),
188  errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
189  }
190 
191  /*
192  * We've been explicitly asked to not connect, that requires some
193  * additional processing.
194  */
195  if (connect && !*connect)
196  {
197  /* Check for incompatible options from the user. */
198  if (enabled && *enabled_given && *enabled)
199  ereport(ERROR,
200  (errcode(ERRCODE_SYNTAX_ERROR),
201  /*- translator: both %s are strings of the form "option = value" */
202  errmsg("%s and %s are mutually exclusive options",
203  "connect = false", "enabled = true")));
204 
205  if (create_slot && create_slot_given && *create_slot)
206  ereport(ERROR,
207  (errcode(ERRCODE_SYNTAX_ERROR),
208  errmsg("%s and %s are mutually exclusive options",
209  "connect = false", "create_slot = true")));
210 
211  if (copy_data && copy_data_given && *copy_data)
212  ereport(ERROR,
213  (errcode(ERRCODE_SYNTAX_ERROR),
214  errmsg("%s and %s are mutually exclusive options",
215  "connect = false", "copy_data = true")));
216 
217  /* Change the defaults of other options. */
218  *enabled = false;
219  *create_slot = false;
220  *copy_data = false;
221  }
222 
223  /*
224  * Do additional checking for disallowed combination when slot_name = NONE
225  * was used.
226  */
227  if (slot_name && *slot_name_given && !*slot_name)
228  {
229  if (enabled && *enabled_given && *enabled)
230  ereport(ERROR,
231  (errcode(ERRCODE_SYNTAX_ERROR),
232  /*- translator: both %s are strings of the form "option = value" */
233  errmsg("%s and %s are mutually exclusive options",
234  "slot_name = NONE", "enabled = true")));
235 
236  if (create_slot && create_slot_given && *create_slot)
237  ereport(ERROR,
238  (errcode(ERRCODE_SYNTAX_ERROR),
239  errmsg("%s and %s are mutually exclusive options",
240  "slot_name = NONE", "create_slot = true")));
241 
242  if (enabled && !*enabled_given && *enabled)
243  ereport(ERROR,
244  (errcode(ERRCODE_SYNTAX_ERROR),
245  /*- translator: both %s are strings of the form "option = value" */
246  errmsg("subscription with %s must also set %s",
247  "slot_name = NONE", "enabled = false")));
248 
249  if (create_slot && !create_slot_given && *create_slot)
250  ereport(ERROR,
251  (errcode(ERRCODE_SYNTAX_ERROR),
252  errmsg("subscription with %s must also set %s",
253  "slot_name = NONE", "create_slot = false")));
254  }
255 }
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:570
#define connect(s, name, namelen)
Definition: win32_port.h:444
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
int synchronous_commit
Definition: xact.c:83
#define ereport(elevel, rest)
Definition: elog.h:141
#define Assert(condition)
Definition: c.h:732
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:784
char * defname
Definition: parsenodes.h:730
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:6722

◆ publicationListToArray()

static Datum publicationListToArray ( List publist)
static

Definition at line 261 of file subscriptioncmds.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, construct_array(), CStringGetTextDatum, CurrentMemoryContext, ereport, errcode(), errmsg(), ERROR, lfirst, list_length(), MemoryContextDelete(), MemoryContextSwitchTo(), name, palloc(), PointerGetDatum, and strVal.

Referenced by AlterSubscription(), and CreateSubscription().

262 {
263  ArrayType *arr;
264  Datum *datums;
265  int j = 0;
266  ListCell *cell;
267  MemoryContext memcxt;
268  MemoryContext oldcxt;
269 
270  /* Create memory context for temporary allocations. */
272  "publicationListToArray to array",
274  oldcxt = MemoryContextSwitchTo(memcxt);
275 
276  datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
277 
278  foreach(cell, publist)
279  {
280  char *name = strVal(lfirst(cell));
281  ListCell *pcell;
282 
283  /* Check for duplicates. */
284  foreach(pcell, publist)
285  {
286  char *pname = strVal(lfirst(pcell));
287 
288  if (pcell == cell)
289  break;
290 
291  if (strcmp(name, pname) == 0)
292  ereport(ERROR,
293  (errcode(ERRCODE_SYNTAX_ERROR),
294  errmsg("publication name \"%s\" used more than once",
295  pname)));
296  }
297 
298  datums[j++] = CStringGetTextDatum(name);
299  }
300 
301  MemoryContextSwitchTo(oldcxt);
302 
303  arr = construct_array(datums, list_length(publist),
304  TEXTOID, -1, false, 'i');
305 
306  MemoryContextDelete(memcxt);
307 
308  return PointerGetDatum(arr);
309 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:169
#define PointerGetDatum(X)
Definition: postgres.h:556
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3291
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:570
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:191
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
uintptr_t Datum
Definition: postgres.h:367
#define lfirst(lc)
Definition: pg_list.h:190
static int list_length(const List *l)
Definition: pg_list.h:169
const char * name
Definition: encode.c:521
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:784
#define CStringGetTextDatum(s)
Definition: builtins.h:83