PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
subscriptioncmds.c File Reference
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Functions

static Listfetch_table_list (WalReceiverConn *wrconn, List *publications)
 
static void parse_subscription_options (List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 601 of file subscriptioncmds.c.

References ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_PUBLICATION_REFRESH, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), Anum_pg_subscription_subconninfo, Anum_pg_subscription_subenabled, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, Anum_pg_subscription_subsynccommit, ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errmsg(), ERROR, GetSubscription(), GetUserId(), heap_close, heap_freetuple(), heap_modify_tuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), MyDatabaseId, namein(), Natts_pg_subscription, NULL, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionRelationId, synchronous_commit, HeapTupleData::t_self, values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

602 {
603  Relation rel;
604  ObjectAddress myself;
605  bool nulls[Natts_pg_subscription];
606  bool replaces[Natts_pg_subscription];
608  HeapTuple tup;
609  Oid subid;
610  bool update_tuple = false;
611  Subscription *sub;
612 
614 
615  /* Fetch the existing tuple. */
617  CStringGetDatum(stmt->subname));
618 
619  if (!HeapTupleIsValid(tup))
620  ereport(ERROR,
621  (errcode(ERRCODE_UNDEFINED_OBJECT),
622  errmsg("subscription \"%s\" does not exist",
623  stmt->subname)));
624 
625  /* must be owner */
628  stmt->subname);
629 
630  subid = HeapTupleGetOid(tup);
631  sub = GetSubscription(subid, false);
632 
633  /* Form a new tuple. */
634  memset(values, 0, sizeof(values));
635  memset(nulls, false, sizeof(nulls));
636  memset(replaces, false, sizeof(replaces));
637 
638  switch (stmt->kind)
639  {
641  {
642  char *slotname;
643  bool slotname_given;
644  char *synchronous_commit;
645 
647  NULL, &slotname_given, &slotname,
648  NULL, &synchronous_commit);
649 
650  if (slotname_given)
651  {
652  if (sub->enabled && !slotname)
653  ereport(ERROR,
654  (errcode(ERRCODE_SYNTAX_ERROR),
655  errmsg("cannot set slot_name = NONE for enabled subscription")));
656 
657  if (slotname)
660  else
661  nulls[Anum_pg_subscription_subslotname - 1] = true;
662  replaces[Anum_pg_subscription_subslotname - 1] = true;
663  }
664 
665  if (synchronous_commit)
666  {
668  CStringGetTextDatum(synchronous_commit);
669  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
670  }
671 
672  update_tuple = true;
673  break;
674  }
675 
677  {
678  bool enabled,
679  enabled_given;
680 
682  &enabled_given, &enabled, NULL,
683  NULL, NULL, NULL, NULL);
684  Assert(enabled_given);
685 
686  if (!sub->slotname && enabled)
687  ereport(ERROR,
688  (errcode(ERRCODE_SYNTAX_ERROR),
689  errmsg("cannot enable subscription that does not have a slot name")));
690 
691  values[Anum_pg_subscription_subenabled - 1] =
692  BoolGetDatum(enabled);
693  replaces[Anum_pg_subscription_subenabled - 1] = true;
694 
695  if (enabled)
697 
698  update_tuple = true;
699  break;
700  }
701 
703  /* Load the library providing us libpq calls. */
704  load_file("libpqwalreceiver", false);
705  /* Check the connection info string. */
707 
710  replaces[Anum_pg_subscription_subconninfo - 1] = true;
711  update_tuple = true;
712  break;
713 
716  {
717  bool copy_data;
718 
720  NULL, NULL, NULL, &copy_data,
721  NULL);
722 
725  replaces[Anum_pg_subscription_subpublications - 1] = true;
726 
727  update_tuple = true;
728 
729  /* Refresh if user asked us to. */
731  {
732  if (!sub->enabled)
733  ereport(ERROR,
734  (errcode(ERRCODE_SYNTAX_ERROR),
735  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
736 
737  /* Make sure refresh sees the new list of publications. */
738  sub->publications = stmt->publication;
739 
740  AlterSubscription_refresh(sub, copy_data);
741  }
742 
743  break;
744  }
745 
747  {
748  bool copy_data;
749 
750  if (!sub->enabled)
751  ereport(ERROR,
752  (errcode(ERRCODE_SYNTAX_ERROR),
753  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
754 
756  NULL, NULL, NULL, &copy_data,
757  NULL);
758 
759  AlterSubscription_refresh(sub, copy_data);
760 
761  break;
762  }
763 
764  default:
765  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
766  stmt->kind);
767  }
768 
769  /* Update the catalog if needed. */
770  if (update_tuple)
771  {
772  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
773  replaces);
774 
775  CatalogTupleUpdate(rel, &tup->t_self, tup);
776 
777  heap_freetuple(tup);
778  }
779 
781 
783 
785 
786  return myself;
787 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
#define Anum_pg_subscription_subpublications
#define RelationGetDescr(relation)
Definition: rel.h:429
Oid GetUserId(void)
Definition: miscinit.c:283
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:244
int errcode(int sqlerrcode)
Definition: elog.c:575
static Datum publicationListToArray(List *publist)
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:584
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define ERROR
Definition: elog.h:43
#define Natts_pg_subscription
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
AlterSubscriptionType kind
Definition: parsenodes.h:3385
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
List * publications
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
#define Anum_pg_subscription_subslotname
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
#define BoolGetDatum(X)
Definition: postgres.h:408
#define Anum_pg_subscription_subconninfo
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit)
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:769
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:167
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:791
#define Anum_pg_subscription_subsynccommit
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data 
)
static

Definition at line 491 of file subscriptioncmds.c.

References AccessShareLock, CheckSubscriptionRelkind(), Subscription::conninfo, ereport, errmsg(), ERROR, fetch_table_list(), get_namespace_name(), get_rel_name(), get_rel_relkind(), GetSubscriptionRelations(), InvalidXLogRecPtr, lfirst, list_length(), load_file(), Subscription::name, NOTICE, Subscription::oid, oid_cmp(), palloc(), Subscription::publications, qsort, quote_identifier(), RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), RangeVar::schemaname, SetSubscriptionRelState(), SUBREL_STATE_INIT, SUBREL_STATE_READY, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

492 {
493  char *err;
494  List *pubrel_names;
495  List *subrel_states;
496  Oid *subrel_local_oids;
497  Oid *pubrel_local_oids;
498  ListCell *lc;
499  int off;
500 
501  /* Load the library providing us libpq calls. */
502  load_file("libpqwalreceiver", false);
503 
504  /* Try to connect to the publisher. */
505  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
506  if (!wrconn)
507  ereport(ERROR,
508  (errmsg("could not connect to the publisher: %s", err)));
509 
510  /* Get the table list from publisher. */
511  pubrel_names = fetch_table_list(wrconn, sub->publications);
512 
513  /* We are done with the remote side, close connection. */
515 
516  /* Get local table list. */
517  subrel_states = GetSubscriptionRelations(sub->oid);
518 
519  /*
520  * Build qsorted array of local table oids for faster lookup. This can
521  * potentially contain all tables in the database so speed of lookup is
522  * important.
523  */
524  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
525  off = 0;
526  foreach(lc, subrel_states)
527  {
529 
530  subrel_local_oids[off++] = relstate->relid;
531  }
532  qsort(subrel_local_oids, list_length(subrel_states),
533  sizeof(Oid), oid_cmp);
534 
535  /*
536  * Walk over the remote tables and try to match them to locally known
537  * tables. If the table is not known locally create a new state for it.
538  *
539  * Also builds array of local oids of remote tables for the next step.
540  */
541  off = 0;
542  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
543 
544  foreach(lc, pubrel_names)
545  {
546  RangeVar *rv = (RangeVar *) lfirst(lc);
547  Oid relid;
548 
549  relid = RangeVarGetRelid(rv, AccessShareLock, false);
550 
551  /* Check for supported relkind. */
553  rv->schemaname, rv->relname);
554 
555  pubrel_local_oids[off++] = relid;
556 
557  if (!bsearch(&relid, subrel_local_oids,
558  list_length(subrel_states), sizeof(Oid), oid_cmp))
559  {
560  SetSubscriptionRelState(sub->oid, relid,
563  ereport(NOTICE,
564  (errmsg("added subscription for table %s.%s",
566  quote_identifier(rv->relname))));
567  }
568  }
569 
570  /*
571  * Next remove state for tables we should not care about anymore using the
572  * data we collected above
573  */
574  qsort(pubrel_local_oids, list_length(pubrel_names),
575  sizeof(Oid), oid_cmp);
576 
577  for (off = 0; off < list_length(subrel_states); off++)
578  {
579  Oid relid = subrel_local_oids[off];
580 
581  if (!bsearch(&relid, pubrel_local_oids,
582  list_length(pubrel_names), sizeof(Oid), oid_cmp))
583  {
584  char *namespace;
585 
586  RemoveSubscriptionRel(sub->oid, relid);
587 
588  namespace = get_namespace_name(get_rel_namespace(relid));
589  ereport(NOTICE,
590  (errmsg("removed subscription for table %s.%s",
591  quote_identifier(namespace),
592  quote_identifier(get_rel_name(relid)))));
593  }
594  }
595 }
#define SUBREL_STATE_INIT
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:107
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10284
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:53
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1801
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1750
#define AccessShareLock
Definition: lockdefs.h:36
unsigned int Oid
Definition: postgres_ext.h:31
char * schemaname
Definition: primnodes.h:67
char * relname
Definition: primnodes.h:68
#define ERROR
Definition: elog.h:43
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3033
List * GetSubscriptionRelations(Oid subid)
List * publications
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
#define ereport(elevel, rest)
Definition: elog.h:122
#define NOTICE
Definition: elog.h:37
#define lfirst(lc)
Definition: pg_list.h:106
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
static int list_length(const List *l)
Definition: pg_list.h:89
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define qsort(a, b, c, d)
Definition: port.h:440
Definition: pg_list.h:45
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define SUBREL_STATE_READY
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1011 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, and SubscriptionRelationId.

Referenced by ExecAlterOwnerStmt().

1012 {
1013  Oid subid;
1014  HeapTuple tup;
1015  Relation rel;
1016  ObjectAddress address;
1017 
1019 
1022 
1023  if (!HeapTupleIsValid(tup))
1024  ereport(ERROR,
1025  (errcode(ERRCODE_UNDEFINED_OBJECT),
1026  errmsg("subscription \"%s\" does not exist", name)));
1027 
1028  subid = HeapTupleGetOid(tup);
1029 
1030  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1031 
1032  ObjectAddressSet(address, SubscriptionRelationId, subid);
1033 
1034  heap_freetuple(tup);
1035 
1037 
1038  return address;
1039 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
#define ereport(elevel, rest)
Definition: elog.h:122
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:167
static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 974 of file subscriptioncmds.c.

References ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, CatalogTupleUpdate(), changeDependencyOnOwner(), ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetUserId(), HeapTupleGetOid, InvokeObjectPostAlterHook, NameStr, pg_subscription_ownercheck(), SubscriptionRelationId, superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

975 {
977 
978  form = (Form_pg_subscription) GETSTRUCT(tup);
979 
980  if (form->subowner == newOwnerId)
981  return;
982 
985  NameStr(form->subname));
986 
987  /* New owner must be a superuser */
988  if (!superuser_arg(newOwnerId))
989  ereport(ERROR,
990  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
991  errmsg("permission denied to change owner of subscription \"%s\"",
992  NameStr(form->subname)),
993  errhint("The owner of a subscription must be a superuser.")));
994 
995  form->subowner = newOwnerId;
996  CatalogTupleUpdate(rel, &tup->t_self, tup);
997 
998  /* Update owner dependency reference */
1000  HeapTupleGetOid(tup),
1001  newOwnerId);
1002 
1004  HeapTupleGetOid(tup), 0);
1005 }
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
Oid GetUserId(void)
Definition: miscinit.c:283
int errcode(int sqlerrcode)
Definition: elog.c:575
FormData_pg_subscription * Form_pg_subscription
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:304
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
bool superuser_arg(Oid roleid)
Definition: superuser.c:57
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1045 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, and SubscriptionRelationId.

Referenced by shdepReassignOwned().

1046 {
1047  HeapTuple tup;
1048  Relation rel;
1049 
1051 
1053 
1054  if (!HeapTupleIsValid(tup))
1055  ereport(ERROR,
1056  (errcode(ERRCODE_UNDEFINED_OBJECT),
1057  errmsg("subscription with OID %u does not exist", subid)));
1058 
1059  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1060 
1061  heap_freetuple(tup);
1062 
1064 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:122
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:165
int errmsg(const char *fmt,...)
Definition: elog.c:797
ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 290 of file subscriptioncmds.c.

References AccessShareLock, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subdbid, Anum_pg_subscription_subenabled, Anum_pg_subscription_subname, Anum_pg_subscription_subowner, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, Anum_pg_subscription_subsynccommit, ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetSysCacheOid2, GetUserId(), heap_close, heap_form_tuple(), heap_freetuple(), heap_open(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), Natts_pg_subscription, NOTICE, NULL, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PreventTransactionChain(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, SetSubscriptionRelState(), snprintf(), CreateSubscriptionStmt::subname, SUBREL_STATE_INIT, SUBREL_STATE_READY, SUBSCRIPTIONNAME, SubscriptionRelationId, superuser(), synchronous_commit, values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

291 {
292  Relation rel;
293  ObjectAddress myself;
294  Oid subid;
295  bool nulls[Natts_pg_subscription];
297  Oid owner = GetUserId();
298  HeapTuple tup;
299  bool connect;
300  bool enabled_given;
301  bool enabled;
302  bool copy_data;
303  char *synchronous_commit;
304  char *conninfo;
305  char *slotname;
306  bool slotname_given;
307  char originname[NAMEDATALEN];
308  bool create_slot;
309  List *publications;
310 
311  /*
312  * Parse and check options.
313  *
314  * Connection and publication should not be specified here.
315  */
316  parse_subscription_options(stmt->options, &connect, &enabled_given,
317  &enabled, &create_slot, &slotname_given,
318  &slotname, &copy_data, &synchronous_commit);
319 
320  /*
321  * Since creating a replication slot is not transactional, rolling back
322  * the transaction leaves the created replication slot. So we cannot run
323  * CREATE SUBSCRIPTION inside a transaction block if creating a
324  * replication slot.
325  */
326  if (create_slot)
327  PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
328 
329  if (!superuser())
330  ereport(ERROR,
331  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
332  (errmsg("must be superuser to create subscriptions"))));
333 
335 
336  /* Check if name is used */
338  CStringGetDatum(stmt->subname));
339  if (OidIsValid(subid))
340  {
341  ereport(ERROR,
343  errmsg("subscription \"%s\" already exists",
344  stmt->subname)));
345  }
346 
347  if (!slotname_given && slotname == NULL)
348  slotname = stmt->subname;
349 
350  /* The default for synchronous_commit of subscriptions is off. */
351  if (synchronous_commit == NULL)
352  synchronous_commit = "off";
353 
354  conninfo = stmt->conninfo;
355  publications = stmt->publication;
356 
357  /* Load the library providing us libpq calls. */
358  load_file("libpqwalreceiver", false);
359 
360  /* Check the connection info string. */
361  walrcv_check_conninfo(conninfo);
362 
363  /* Everything ok, form a new tuple. */
364  memset(values, 0, sizeof(values));
365  memset(nulls, false, sizeof(nulls));
366 
368  values[Anum_pg_subscription_subname - 1] =
370  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
371  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
373  CStringGetTextDatum(conninfo);
374  if (slotname)
377  else
378  nulls[Anum_pg_subscription_subslotname - 1] = true;
380  CStringGetTextDatum(synchronous_commit);
382  publicationListToArray(publications);
383 
384  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
385 
386  /* Insert tuple into catalog. */
387  subid = CatalogTupleInsert(rel, tup);
388  heap_freetuple(tup);
389 
391 
392  snprintf(originname, sizeof(originname), "pg_%u", subid);
393  replorigin_create(originname);
394 
395  /*
396  * Connect to remote side to execute requested commands and fetch table
397  * info.
398  */
399  if (connect)
400  {
401  XLogRecPtr lsn;
402  char *err;
404  List *tables;
405  ListCell *lc;
406  char table_state;
407 
408  /* Try to connect to the publisher. */
409  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
410  if (!wrconn)
411  ereport(ERROR,
412  (errmsg("could not connect to the publisher: %s", err)));
413 
414  PG_TRY();
415  {
416  /*
417  * Set sync state based on if we were asked to do data copy or
418  * not.
419  */
420  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
421 
422  /*
423  * Get the table list from publisher and build local table status
424  * info.
425  */
426  tables = fetch_table_list(wrconn, publications);
427  foreach(lc, tables)
428  {
429  RangeVar *rv = (RangeVar *) lfirst(lc);
430  Oid relid;
431 
432  relid = RangeVarGetRelid(rv, AccessShareLock, false);
433 
434  /* Check for supported relkind. */
436  rv->schemaname, rv->relname);
437 
438  SetSubscriptionRelState(subid, relid, table_state,
440  }
441 
442  ereport(NOTICE,
443  (errmsg("synchronized table states")));
444 
445  /*
446  * If requested, create permanent slot for the subscription. We
447  * won't use the initial snapshot for anything, so no need to
448  * export it.
449  */
450  if (create_slot)
451  {
452  Assert(slotname);
453 
454  walrcv_create_slot(wrconn, slotname, false,
455  CRS_NOEXPORT_SNAPSHOT, &lsn);
456  ereport(NOTICE,
457  (errmsg("created replication slot \"%s\" on publisher",
458  slotname)));
459  }
460  }
461  PG_CATCH();
462  {
463  /* Close the connection in case of failure. */
464  walrcv_disconnect(wrconn);
465  PG_RE_THROW();
466  }
467  PG_END_TRY();
468 
469  /* And we are done with the remote side. */
470  walrcv_disconnect(wrconn);
471  }
472  else
474  (errmsg("tables were not subscribed, you will have to run "
475  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
476  "subscribe the tables")));
477 
479 
480  if (enabled)
482 
484 
486 
487  return myself;
488 }
#define connect(s, name, namelen)
Definition: win32.h:373
#define SUBREL_STATE_INIT
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:107
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
#define Anum_pg_subscription_subpublications
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
#define Anum_pg_subscription_subowner
#define RelationGetDescr(relation)
Definition: rel.h:429
Oid GetUserId(void)
Definition: miscinit.c:283
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:53
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:244
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1801
#define AccessShareLock
Definition: lockdefs.h:36
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:584
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:159
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:260
#define OidIsValid(objectId)
Definition: c.h:538
#define GetSysCacheOid2(cacheId, key1, key2)
Definition: syscache.h:185
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define Natts_pg_subscription
#define Anum_pg_subscription_subname
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define Anum_pg_subscription_subslotname
#define Anum_pg_subscription_subdbid
#define WARNING
Definition: elog.h:40
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
#define BoolGetDatum(X)
Definition: postgres.h:408
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit)
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:769
#define PG_TRY()
Definition: elog.h:284
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:45
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define SUBREL_STATE_READY
#define Anum_pg_subscription_subsynccommit
#define PG_END_TRY()
Definition: elog.h:300
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3156
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 793 of file subscriptioncmds.c.

References AccessExclusiveLock, ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subname, Anum_pg_subscription_subslotname, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GetUserId(), heap_close, heap_open(), HeapTupleGetOid, HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, load_file(), LockSharedObject(), logicalrep_worker_stop(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, NULL, ObjectAddressSet, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, pg_subscription_ownercheck(), PG_TRY, PreventTransactionChain(), pstrdup(), quote_identifier(), ReleaseSysCache(), RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2, snprintf(), WalRcvExecResult::status, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SubscriptionRelationId, SysCacheGetAttr(), HeapTupleData::t_self, TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

794 {
795  Relation rel;
796  ObjectAddress myself;
797  HeapTuple tup;
798  Oid subid;
799  Datum datum;
800  bool isnull;
801  char *subname;
802  char *conninfo;
803  char *slotname;
804  char originname[NAMEDATALEN];
805  char *err = NULL;
806  RepOriginId originid;
808  StringInfoData cmd;
809 
810  /*
811  * Lock pg_subscription with AccessExclusiveLock to ensure that the
812  * launcher doesn't restart new worker during dropping the subscription
813  */
815 
817  CStringGetDatum(stmt->subname));
818 
819  if (!HeapTupleIsValid(tup))
820  {
821  heap_close(rel, NoLock);
822 
823  if (!stmt->missing_ok)
824  ereport(ERROR,
825  (errcode(ERRCODE_UNDEFINED_OBJECT),
826  errmsg("subscription \"%s\" does not exist",
827  stmt->subname)));
828  else
829  ereport(NOTICE,
830  (errmsg("subscription \"%s\" does not exist, skipping",
831  stmt->subname)));
832 
833  return;
834  }
835 
836  subid = HeapTupleGetOid(tup);
837 
838  /* must be owner */
839  if (!pg_subscription_ownercheck(subid, GetUserId()))
841  stmt->subname);
842 
843  /* DROP hook for the subscription being removed */
845 
846  /*
847  * Lock the subscription so nobody else can do anything with it (including
848  * the replication workers).
849  */
851 
852  /* Get subname */
853  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
855  Assert(!isnull);
856  subname = pstrdup(NameStr(*DatumGetName(datum)));
857 
858  /* Get conninfo */
859  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
861  Assert(!isnull);
862  conninfo = TextDatumGetCString(datum);
863 
864  /* Get slotname */
865  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
867  if (!isnull)
868  slotname = pstrdup(NameStr(*DatumGetName(datum)));
869  else
870  slotname = NULL;
871 
872  /*
873  * Since dropping a replication slot is not transactional, the replication
874  * slot stays dropped even if the transaction rolls back. So we cannot
875  * run DROP SUBSCRIPTION inside a transaction block if dropping the
876  * replication slot.
877  *
878  * XXX The command name should really be something like "DROP SUBSCRIPTION
879  * of a subscription that is associated with a replication slot", but we
880  * don't have the proper facilities for that.
881  */
882  if (slotname)
883  PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
884 
885 
887  EventTriggerSQLDropAddObject(&myself, true, true);
888 
889  /* Remove the tuple from catalog. */
890  CatalogTupleDelete(rel, &tup->t_self);
891 
892  ReleaseSysCache(tup);
893 
894  /* Clean up dependencies */
896 
897  /* Remove any associated relation synchronization states. */
899 
900  /* Kill the apply worker so that the slot becomes accessible. */
902 
903  /* Remove the origin tracking if exists. */
904  snprintf(originname, sizeof(originname), "pg_%u", subid);
905  originid = replorigin_by_name(originname, true);
906  if (originid != InvalidRepOriginId)
907  replorigin_drop(originid);
908 
909  /*
910  * If there is no slot associated with the subscription, we can finish
911  * here.
912  */
913  if (!slotname)
914  {
915  heap_close(rel, NoLock);
916  return;
917  }
918 
919  /*
920  * Otherwise drop the replication slot at the publisher node using the
921  * replication connection.
922  */
923  load_file("libpqwalreceiver", false);
924 
925  initStringInfo(&cmd);
926  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
927 
928  wrconn = walrcv_connect(conninfo, true, subname, &err);
929  if (wrconn == NULL)
930  ereport(ERROR,
931  (errmsg("could not connect to publisher when attempting to "
932  "drop the replication slot \"%s\"", slotname),
933  errdetail("The error was: %s", err),
934  errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
935  "to disassociate the subscription from the slot.")));
936 
937  PG_TRY();
938  {
939  WalRcvExecResult *res;
940 
941  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
942 
943  if (res->status != WALRCV_OK_COMMAND)
944  ereport(ERROR,
945  (errmsg("could not drop the replication slot \"%s\" on publisher",
946  slotname),
947  errdetail("The error was: %s", res->err)));
948  else
949  ereport(NOTICE,
950  (errmsg("dropped replication slot \"%s\" on publisher",
951  slotname)));
952 
953  walrcv_clear_result(res);
954  }
955  PG_CATCH();
956  {
957  /* Close the connection in case of failure */
958  walrcv_disconnect(wrconn);
959  PG_RE_THROW();
960  }
961  PG_END_TRY();
962 
963  walrcv_disconnect(wrconn);
964 
965  pfree(cmd.data);
966 
967  heap_close(rel, NoLock);
968 }
WalReceiverConn * wrconn
Definition: worker.c:107
int errhint(const char *fmt,...)
Definition: elog.c:987
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10284
Oid GetUserId(void)
Definition: miscinit.c:283
char * pstrdup(const char *in)
Definition: mcxt.c:1077
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:154
uint16 RepOriginId
Definition: xlogdefs.h:51
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
#define heap_close(r, l)
Definition: heapam.h:97
unsigned int Oid
Definition: postgres_ext.h:31
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:591
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:268
void pfree(void *pointer)
Definition: mcxt.c:950
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
#define Anum_pg_subscription_subname
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
#define NoLock
Definition: lockdefs.h:34
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:411
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define CStringGetDatum(X)
Definition: postgres.h:584
#define ereport(elevel, rest)
Definition: elog.h:122
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:823
#define Anum_pg_subscription_subslotname
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
void replorigin_drop(RepOriginId roident)
Definition: origin.c:327
#define TextDatumGetCString(d)
Definition: builtins.h:92
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1279
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define InvalidOid
Definition: postgres_ext.h:36
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
WalRcvExecStatus status
Definition: walreceiver.h:187
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
#define InvalidRepOriginId
Definition: origin.h:34
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:46
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:499
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define PG_TRY()
Definition: elog.h:284
#define PG_END_TRY()
Definition: elog.h:300
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:262
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3156
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
#define SearchSysCache2(cacheId, key1, key2)
Definition: syscache.h:158
static List * fetch_table_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 1071 of file subscriptioncmds.c.

References appendStringInfo(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), initStringInfo(), lappend(), lfirst, list_length(), makeRangeVar(), MakeSingleTupleTableSlot(), NIL, pfree(), pstrdup(), quote_literal_cstr(), slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TEXTOID, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, and WALRCV_OK_TUPLES.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

1072 {
1073  WalRcvExecResult *res;
1074  StringInfoData cmd;
1075  TupleTableSlot *slot;
1076  Oid tableRow[2] = {TEXTOID, TEXTOID};
1077  ListCell *lc;
1078  bool first;
1079  List *tablelist = NIL;
1080 
1081  Assert(list_length(publications) > 0);
1082 
1083  initStringInfo(&cmd);
1084  appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1085  " FROM pg_catalog.pg_publication_tables t\n"
1086  " WHERE t.pubname IN (");
1087  first = true;
1088  foreach(lc, publications)
1089  {
1090  char *pubname = strVal(lfirst(lc));
1091 
1092  if (first)
1093  first = false;
1094  else
1095  appendStringInfoString(&cmd, ", ");
1096 
1097  appendStringInfo(&cmd, "%s", quote_literal_cstr(pubname));
1098  }
1099  appendStringInfoString(&cmd, ")");
1100 
1101  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1102  pfree(cmd.data);
1103 
1104  if (res->status != WALRCV_OK_TUPLES)
1105  ereport(ERROR,
1106  (errmsg("could not receive list of replicated tables from the publisher: %s",
1107  res->err)));
1108 
1109  /* Process tables. */
1110  slot = MakeSingleTupleTableSlot(res->tupledesc);
1111  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1112  {
1113  char *nspname;
1114  char *relname;
1115  bool isnull;
1116  RangeVar *rv;
1117 
1118  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1119  Assert(!isnull);
1120  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1121  Assert(!isnull);
1122 
1123  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1124  tablelist = lappend(tablelist, rv);
1125 
1126  ExecClearTuple(slot);
1127  }
1129 
1130  walrcv_clear_result(res);
1131 
1132  return tablelist;
1133 }
#define NIL
Definition: pg_list.h:69
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define TEXTOID
Definition: pg_type.h:324
char * pstrdup(const char *in)
Definition: mcxt.c:1077
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
#define strVal(v)
Definition: value.h:54
unsigned int Oid
Definition: postgres_ext.h:31
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:268
void pfree(void *pointer)
Definition: mcxt.c:950
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
TupleDesc tupledesc
Definition: walreceiver.h:190
#define ERROR
Definition: elog.h:43
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:216
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc)
Definition: execTuples.c:199
#define ereport(elevel, rest)
Definition: elog.h:122
List * lappend(List *list, void *datum)
Definition: list.c:128
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define TextDatumGetCString(d)
Definition: builtins.h:92
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
Tuplestorestate * tuplestore
Definition: walreceiver.h:189
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
WalRcvExecStatus status
Definition: walreceiver.h:187
static int list_length(const List *l)
Definition: pg_list.h:89
int errmsg(const char *fmt,...)
Definition: elog.c:797
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1141
Definition: pg_list.h:45
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:419
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:262
static void parse_subscription_options ( List options,
bool connect,
bool enabled_given,
bool enabled,
bool create_slot,
bool slot_name_given,
char **  slot_name,
bool copy_data,
char **  synchronous_commit 
)
static

Definition at line 64 of file subscriptioncmds.c.

References Assert, connect, defGetBoolean(), defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, GUC_ACTION_SET, lfirst, NULL, PGC_BACKEND, PGC_S_TEST, and set_config_option().

Referenced by AlterSubscription(), and CreateSubscription().

68 {
69  ListCell *lc;
70  bool connect_given = false;
71  bool create_slot_given = false;
72  bool copy_data_given = false;
73 
74  /* If connect is specified, the others also need to be. */
75  Assert(!connect || (enabled && create_slot && copy_data));
76 
77  if (connect)
78  *connect = true;
79  if (enabled)
80  {
81  *enabled_given = false;
82  *enabled = true;
83  }
84  if (create_slot)
85  *create_slot = true;
86  if (slot_name)
87  {
88  *slot_name_given = false;
89  *slot_name = NULL;
90  }
91  if (copy_data)
92  *copy_data = true;
95 
96  /* Parse options */
97  foreach(lc, options)
98  {
99  DefElem *defel = (DefElem *) lfirst(lc);
100 
101  if (strcmp(defel->defname, "connect") == 0 && connect)
102  {
103  if (connect_given)
104  ereport(ERROR,
105  (errcode(ERRCODE_SYNTAX_ERROR),
106  errmsg("conflicting or redundant options")));
107 
108  connect_given = true;
109  *connect = defGetBoolean(defel);
110  }
111  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
112  {
113  if (*enabled_given)
114  ereport(ERROR,
115  (errcode(ERRCODE_SYNTAX_ERROR),
116  errmsg("conflicting or redundant options")));
117 
118  *enabled_given = true;
119  *enabled = defGetBoolean(defel);
120  }
121  else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
122  {
123  if (create_slot_given)
124  ereport(ERROR,
125  (errcode(ERRCODE_SYNTAX_ERROR),
126  errmsg("conflicting or redundant options")));
127 
128  create_slot_given = true;
129  *create_slot = defGetBoolean(defel);
130  }
131  else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
132  {
133  if (*slot_name_given)
134  ereport(ERROR,
135  (errcode(ERRCODE_SYNTAX_ERROR),
136  errmsg("conflicting or redundant options")));
137 
138  *slot_name_given = true;
139  *slot_name = defGetString(defel);
140 
141  /* Setting slot_name = NONE is treated as no slot name. */
142  if (strcmp(*slot_name, "none") == 0)
143  *slot_name = NULL;
144  }
145  else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
146  {
147  if (copy_data_given)
148  ereport(ERROR,
149  (errcode(ERRCODE_SYNTAX_ERROR),
150  errmsg("conflicting or redundant options")));
151 
152  copy_data_given = true;
153  *copy_data = defGetBoolean(defel);
154  }
155  else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
157  {
158  if (*synchronous_commit)
159  ereport(ERROR,
160  (errcode(ERRCODE_SYNTAX_ERROR),
161  errmsg("conflicting or redundant options")));
162 
164 
165  /* Test if the given value is valid for synchronous_commit GUC. */
166  (void) set_config_option("synchronous_commit", *synchronous_commit,
168  false, 0, false);
169  }
170  else
171  ereport(ERROR,
172  (errcode(ERRCODE_SYNTAX_ERROR),
173  errmsg("unrecognized subscription parameter: %s", defel->defname)));
174  }
175 
176  /*
177  * We've been explicitly asked to not connect, that requires some
178  * additional processing.
179  */
180  if (connect && !*connect)
181  {
182  /* Check for incompatible options from the user. */
183  if (enabled && *enabled_given && *enabled)
184  ereport(ERROR,
185  (errcode(ERRCODE_SYNTAX_ERROR),
186  errmsg("connect = false and enabled = true are mutually exclusive options")));
187 
188  if (create_slot && create_slot_given && *create_slot)
189  ereport(ERROR,
190  (errcode(ERRCODE_SYNTAX_ERROR),
191  errmsg("connect = false and create_slot = true are mutually exclusive options")));
192 
193  if (copy_data && copy_data_given && *copy_data)
194  ereport(ERROR,
195  (errcode(ERRCODE_SYNTAX_ERROR),
196  errmsg("connect = false and copy_data = true are mutually exclusive options")));
197 
198  /* Change the defaults of other options. */
199  *enabled = false;
200  *create_slot = false;
201  *copy_data = false;
202  }
203 
204  /*
205  * Do additional checking for disallowed combination when slot_name = NONE
206  * was used.
207  */
208  if (slot_name && *slot_name_given && !*slot_name)
209  {
210  if (enabled && *enabled_given && *enabled)
211  ereport(ERROR,
212  (errcode(ERRCODE_SYNTAX_ERROR),
213  errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
214 
215  if (create_slot && create_slot_given && *create_slot)
216  ereport(ERROR,
217  (errcode(ERRCODE_SYNTAX_ERROR),
218  errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
219 
220  if (enabled && !*enabled_given && *enabled)
221  ereport(ERROR,
222  (errcode(ERRCODE_SYNTAX_ERROR),
223  errmsg("subscription with slot_name = NONE must also set enabled = false")));
224 
225  if (create_slot && !create_slot_given && *create_slot)
226  ereport(ERROR,
227  (errcode(ERRCODE_SYNTAX_ERROR),
228  errmsg("subscription with slot_name = NONE must also set create_slot = false")));
229  }
230 }
#define connect(s, name, namelen)
Definition: win32.h:373
int errcode(int sqlerrcode)
Definition: elog.c:575
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:720
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:5908
static Datum publicationListToArray ( List publist)
static

Definition at line 236 of file subscriptioncmds.c.

References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, AllocSetContextCreate(), construct_array(), CStringGetTextDatum, CurrentMemoryContext, ereport, errcode(), errmsg(), ERROR, lfirst, list_length(), MemoryContextDelete(), MemoryContextSwitchTo(), name, palloc(), PointerGetDatum, strVal, and TEXTOID.

Referenced by AlterSubscription(), and CreateSubscription().

237 {
238  ArrayType *arr;
239  Datum *datums;
240  int j = 0;
241  ListCell *cell;
242  MemoryContext memcxt;
243  MemoryContext oldcxt;
244 
245  /* Create memory context for temporary allocations. */
247  "publicationListToArray to array",
251  oldcxt = MemoryContextSwitchTo(memcxt);
252 
253  datums = palloc(sizeof(text *) * list_length(publist));
254  foreach(cell, publist)
255  {
256  char *name = strVal(lfirst(cell));
257  ListCell *pcell;
258 
259  /* Check for duplicates. */
260  foreach(pcell, publist)
261  {
262  char *pname = strVal(lfirst(pcell));
263 
264  if (name == pname)
265  break;
266 
267  if (strcmp(name, pname) == 0)
268  ereport(ERROR,
269  (errcode(ERRCODE_SYNTAX_ERROR),
270  errmsg("publication name \"%s\" used more than once",
271  pname)));
272  }
273 
274  datums[j++] = CStringGetTextDatum(name);
275  }
276 
277  MemoryContextSwitchTo(oldcxt);
278 
279  arr = construct_array(datums, list_length(publist),
280  TEXTOID, -1, false, 'i');
281  MemoryContextDelete(memcxt);
282 
283  return PointerGetDatum(arr);
284 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define TEXTOID
Definition: pg_type.h:324
#define PointerGetDatum(X)
Definition: postgres.h:562
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3306
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
#define ERROR
Definition: elog.h:43
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
uintptr_t Datum
Definition: postgres.h:372
#define lfirst(lc)
Definition: pg_list.h:106
static int list_length(const List *l)
Definition: pg_list.h:89
const char * name
Definition: encode.c:521
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
#define CStringGetTextDatum(s)
Definition: builtins.h:91
Definition: c.h:439
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164