PostgreSQL Source Code  git master
subscriptioncmds.c File Reference
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Functions

static Listfetch_table_list (WalReceiverConn *wrconn, List *publications)
 
static void parse_subscription_options (List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 614 of file subscriptioncmds.c.

References AccessExclusiveLock, ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), Anum_pg_subscription_subconninfo, Anum_pg_subscription_subenabled, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, Anum_pg_subscription_subsynccommit, ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GetSubscription(), GetUserId(), heap_close, heap_freetuple(), heap_modify_tuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), MyDatabaseId, namein(), Natts_pg_subscription, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionRelationId, synchronous_commit, HeapTupleData::t_self, values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

615 {
616  Relation rel;
617  ObjectAddress myself;
618  bool nulls[Natts_pg_subscription];
619  bool replaces[Natts_pg_subscription];
621  HeapTuple tup;
622  Oid subid;
623  bool update_tuple = false;
624  Subscription *sub;
625 
627 
628  /* Fetch the existing tuple. */
630  CStringGetDatum(stmt->subname));
631 
632  if (!HeapTupleIsValid(tup))
633  ereport(ERROR,
634  (errcode(ERRCODE_UNDEFINED_OBJECT),
635  errmsg("subscription \"%s\" does not exist",
636  stmt->subname)));
637 
638  /* must be owner */
641  stmt->subname);
642 
643  subid = HeapTupleGetOid(tup);
644  sub = GetSubscription(subid, false);
645 
646  /* Lock the subscription so nobody else can do anything with it. */
648 
649  /* Form a new tuple. */
650  memset(values, 0, sizeof(values));
651  memset(nulls, false, sizeof(nulls));
652  memset(replaces, false, sizeof(replaces));
653 
654  switch (stmt->kind)
655  {
657  {
658  char *slotname;
659  bool slotname_given;
660  char *synchronous_commit;
661 
662  parse_subscription_options(stmt->options, NULL, NULL, NULL,
663  NULL, &slotname_given, &slotname,
664  NULL, &synchronous_commit, NULL);
665 
666  if (slotname_given)
667  {
668  if (sub->enabled && !slotname)
669  ereport(ERROR,
670  (errcode(ERRCODE_SYNTAX_ERROR),
671  errmsg("cannot set slot_name = NONE for enabled subscription")));
672 
673  if (slotname)
676  else
677  nulls[Anum_pg_subscription_subslotname - 1] = true;
678  replaces[Anum_pg_subscription_subslotname - 1] = true;
679  }
680 
681  if (synchronous_commit)
682  {
684  CStringGetTextDatum(synchronous_commit);
685  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
686  }
687 
688  update_tuple = true;
689  break;
690  }
691 
693  {
694  bool enabled,
695  enabled_given;
696 
698  &enabled_given, &enabled, NULL,
699  NULL, NULL, NULL, NULL, NULL);
700  Assert(enabled_given);
701 
702  if (!sub->slotname && enabled)
703  ereport(ERROR,
704  (errcode(ERRCODE_SYNTAX_ERROR),
705  errmsg("cannot enable subscription that does not have a slot name")));
706 
707  values[Anum_pg_subscription_subenabled - 1] =
708  BoolGetDatum(enabled);
709  replaces[Anum_pg_subscription_subenabled - 1] = true;
710 
711  if (enabled)
713 
714  update_tuple = true;
715  break;
716  }
717 
719  /* Load the library providing us libpq calls. */
720  load_file("libpqwalreceiver", false);
721  /* Check the connection info string. */
723 
726  replaces[Anum_pg_subscription_subconninfo - 1] = true;
727  update_tuple = true;
728  break;
729 
731  {
732  bool copy_data;
733  bool refresh;
734 
735  parse_subscription_options(stmt->options, NULL, NULL, NULL,
736  NULL, NULL, NULL, &copy_data,
737  NULL, &refresh);
738 
741  replaces[Anum_pg_subscription_subpublications - 1] = true;
742 
743  update_tuple = true;
744 
745  /* Refresh if user asked us to. */
746  if (refresh)
747  {
748  if (!sub->enabled)
749  ereport(ERROR,
750  (errcode(ERRCODE_SYNTAX_ERROR),
751  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
752  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
753 
754  /* Make sure refresh sees the new list of publications. */
755  sub->publications = stmt->publication;
756 
757  AlterSubscription_refresh(sub, copy_data);
758  }
759 
760  break;
761  }
762 
764  {
765  bool copy_data;
766 
767  if (!sub->enabled)
768  ereport(ERROR,
769  (errcode(ERRCODE_SYNTAX_ERROR),
770  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
771 
772  parse_subscription_options(stmt->options, NULL, NULL, NULL,
773  NULL, NULL, NULL, &copy_data,
774  NULL, NULL);
775 
776  AlterSubscription_refresh(sub, copy_data);
777 
778  break;
779  }
780 
781  default:
782  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
783  stmt->kind);
784  }
785 
786  /* Update the catalog if needed. */
787  if (update_tuple)
788  {
789  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
790  replaces);
791 
792  CatalogTupleUpdate(rel, &tup->t_self, tup);
793 
794  heap_freetuple(tup);
795  }
796 
798 
800 
802 
803  return myself;
804 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
int errhint(const char *fmt,...)
Definition: elog.c:987
#define Anum_pg_subscription_subpublications
#define RelationGetDescr(relation)
Definition: rel.h:437
Oid GetUserId(void)
Definition: miscinit.c:284
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:245
int errcode(int sqlerrcode)
Definition: elog.c:575
static Datum publicationListToArray(List *publist)
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:585
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define ERROR
Definition: elog.h:43
#define Natts_pg_subscription
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
AlterSubscriptionType kind
Definition: parsenodes.h:3441
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
List * publications
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
#define Anum_pg_subscription_subslotname
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define BoolGetDatum(X)
Definition: postgres.h:408
#define Anum_pg_subscription_subconninfo
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define Assert(condition)
Definition: c.h:670
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:700
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:866
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:175
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:794
#define Anum_pg_subscription_subsynccommit
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110

◆ AlterSubscription_refresh()

static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data 
)
static

Definition at line 505 of file subscriptioncmds.c.

References AccessShareLock, CheckSubscriptionRelkind(), Subscription::conninfo, DEBUG1, ereport, errmsg(), ERROR, fetch_table_list(), get_namespace_name(), get_rel_name(), get_rel_namespace(), get_rel_relkind(), GetSubscriptionRelations(), InvalidXLogRecPtr, lfirst, list_length(), load_file(), logicalrep_worker_stop_at_commit(), Subscription::name, Subscription::oid, oid_cmp(), palloc(), Subscription::publications, qsort, RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), RangeVar::schemaname, SetSubscriptionRelState(), SUBREL_STATE_INIT, SUBREL_STATE_READY, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

506 {
507  char *err;
508  List *pubrel_names;
509  List *subrel_states;
510  Oid *subrel_local_oids;
511  Oid *pubrel_local_oids;
512  ListCell *lc;
513  int off;
514 
515  /* Load the library providing us libpq calls. */
516  load_file("libpqwalreceiver", false);
517 
518  /* Try to connect to the publisher. */
519  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
520  if (!wrconn)
521  ereport(ERROR,
522  (errmsg("could not connect to the publisher: %s", err)));
523 
524  /* Get the table list from publisher. */
525  pubrel_names = fetch_table_list(wrconn, sub->publications);
526 
527  /* We are done with the remote side, close connection. */
529 
530  /* Get local table list. */
531  subrel_states = GetSubscriptionRelations(sub->oid);
532 
533  /*
534  * Build qsorted array of local table oids for faster lookup. This can
535  * potentially contain all tables in the database so speed of lookup is
536  * important.
537  */
538  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
539  off = 0;
540  foreach(lc, subrel_states)
541  {
543 
544  subrel_local_oids[off++] = relstate->relid;
545  }
546  qsort(subrel_local_oids, list_length(subrel_states),
547  sizeof(Oid), oid_cmp);
548 
549  /*
550  * Walk over the remote tables and try to match them to locally known
551  * tables. If the table is not known locally create a new state for it.
552  *
553  * Also builds array of local oids of remote tables for the next step.
554  */
555  off = 0;
556  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
557 
558  foreach(lc, pubrel_names)
559  {
560  RangeVar *rv = (RangeVar *) lfirst(lc);
561  Oid relid;
562 
563  relid = RangeVarGetRelid(rv, AccessShareLock, false);
564 
565  /* Check for supported relkind. */
567  rv->schemaname, rv->relname);
568 
569  pubrel_local_oids[off++] = relid;
570 
571  if (!bsearch(&relid, subrel_local_oids,
572  list_length(subrel_states), sizeof(Oid), oid_cmp))
573  {
574  SetSubscriptionRelState(sub->oid, relid,
576  InvalidXLogRecPtr, false);
577  ereport(DEBUG1,
578  (errmsg("table \"%s.%s\" added to subscription \"%s\"",
579  rv->schemaname, rv->relname, sub->name)));
580  }
581  }
582 
583  /*
584  * Next remove state for tables we should not care about anymore using the
585  * data we collected above
586  */
587  qsort(pubrel_local_oids, list_length(pubrel_names),
588  sizeof(Oid), oid_cmp);
589 
590  for (off = 0; off < list_length(subrel_states); off++)
591  {
592  Oid relid = subrel_local_oids[off];
593 
594  if (!bsearch(&relid, pubrel_local_oids,
595  list_length(pubrel_names), sizeof(Oid), oid_cmp))
596  {
597  RemoveSubscriptionRel(sub->oid, relid);
598 
600 
601  ereport(DEBUG1,
602  (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
604  get_rel_name(relid),
605  sub->name)));
606  }
607  }
608 }
#define SUBREL_STATE_INIT
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:110
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
void RemoveSubscriptionRel(Oid subid, Oid relid)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:53
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1801
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1750
#define AccessShareLock
Definition: lockdefs.h:36
unsigned int Oid
Definition: postgres_ext.h:31
char * schemaname
Definition: primnodes.h:67
char * relname
Definition: primnodes.h:68
#define ERROR
Definition: elog.h:43
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3047
List * GetSubscriptionRelations(Oid subid)
List * publications
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
#define ereport(elevel, rest)
Definition: elog.h:122
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool update_only)
#define lfirst(lc)
Definition: pg_list.h:106
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
static int list_length(const List *l)
Definition: pg_list.h:89
#define walrcv_disconnect(conn)
Definition: walreceiver.h:265
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
Definition: launcher.c:560
#define qsort(a, b, c, d)
Definition: port.h:408
Definition: pg_list.h:45
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
#define SUBREL_STATE_READY
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:243

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1054 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, and SubscriptionRelationId.

Referenced by ExecAlterOwnerStmt().

1055 {
1056  Oid subid;
1057  HeapTuple tup;
1058  Relation rel;
1059  ObjectAddress address;
1060 
1062 
1065 
1066  if (!HeapTupleIsValid(tup))
1067  ereport(ERROR,
1068  (errcode(ERRCODE_UNDEFINED_OBJECT),
1069  errmsg("subscription \"%s\" does not exist", name)));
1070 
1071  subid = HeapTupleGetOid(tup);
1072 
1073  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1074 
1075  ObjectAddressSet(address, SubscriptionRelationId, subid);
1076 
1077  heap_freetuple(tup);
1078 
1080 
1081  return address;
1082 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
#define ereport(elevel, rest)
Definition: elog.h:122
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:700
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:175

◆ AlterSubscriptionOwner_internal()

static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 1017 of file subscriptioncmds.c.

References ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, CatalogTupleUpdate(), changeDependencyOnOwner(), ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetUserId(), HeapTupleGetOid, InvokeObjectPostAlterHook, NameStr, pg_subscription_ownercheck(), SubscriptionRelationId, superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

1018 {
1019  Form_pg_subscription form;
1020 
1021  form = (Form_pg_subscription) GETSTRUCT(tup);
1022 
1023  if (form->subowner == newOwnerId)
1024  return;
1025 
1028  NameStr(form->subname));
1029 
1030  /* New owner must be a superuser */
1031  if (!superuser_arg(newOwnerId))
1032  ereport(ERROR,
1033  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1034  errmsg("permission denied to change owner of subscription \"%s\"",
1035  NameStr(form->subname)),
1036  errhint("The owner of a subscription must be a superuser.")));
1037 
1038  form->subowner = newOwnerId;
1039  CatalogTupleUpdate(rel, &tup->t_self, tup);
1040 
1041  /* Update owner dependency reference */
1043  HeapTupleGetOid(tup),
1044  newOwnerId);
1045 
1047  HeapTupleGetOid(tup), 0);
1048 }
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:661
Oid GetUserId(void)
Definition: miscinit.c:284
int errcode(int sqlerrcode)
Definition: elog.c:575
FormData_pg_subscription * Form_pg_subscription
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:304
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
bool superuser_arg(Oid roleid)
Definition: superuser.c:57
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:547
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:700
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1088 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, and SubscriptionRelationId.

Referenced by shdepReassignOwned().

1089 {
1090  HeapTuple tup;
1091  Relation rel;
1092 
1094 
1096 
1097  if (!HeapTupleIsValid(tup))
1098  ereport(ERROR,
1099  (errcode(ERRCODE_UNDEFINED_OBJECT),
1100  errmsg("subscription with OID %u does not exist", subid)));
1101 
1102  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1103 
1104  heap_freetuple(tup);
1105 
1107 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:122
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:173
int errmsg(const char *fmt,...)
Definition: elog.c:797

◆ CreateSubscription()

ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 306 of file subscriptioncmds.c.

References AccessShareLock, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subdbid, Anum_pg_subscription_subenabled, Anum_pg_subscription_subname, Anum_pg_subscription_subowner, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, Anum_pg_subscription_subsynccommit, ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, create_slot, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetSysCacheOid2, GetUserId(), heap_close, heap_form_tuple(), heap_freetuple(), heap_open(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), Natts_pg_subscription, NOTICE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PreventTransactionChain(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, SetSubscriptionRelState(), snprintf(), CreateSubscriptionStmt::subname, SUBREL_STATE_INIT, SUBREL_STATE_READY, SUBSCRIPTIONNAME, SubscriptionRelationId, superuser(), synchronous_commit, values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

307 {
308  Relation rel;
309  ObjectAddress myself;
310  Oid subid;
311  bool nulls[Natts_pg_subscription];
313  Oid owner = GetUserId();
314  HeapTuple tup;
315  bool connect;
316  bool enabled_given;
317  bool enabled;
318  bool copy_data;
319  char *synchronous_commit;
320  char *conninfo;
321  char *slotname;
322  bool slotname_given;
323  char originname[NAMEDATALEN];
324  bool create_slot;
325  List *publications;
326 
327  /*
328  * Parse and check options.
329  *
330  * Connection and publication should not be specified here.
331  */
332  parse_subscription_options(stmt->options, &connect, &enabled_given,
333  &enabled, &create_slot, &slotname_given,
334  &slotname, &copy_data, &synchronous_commit,
335  NULL);
336 
337  /*
338  * Since creating a replication slot is not transactional, rolling back
339  * the transaction leaves the created replication slot. So we cannot run
340  * CREATE SUBSCRIPTION inside a transaction block if creating a
341  * replication slot.
342  */
343  if (create_slot)
344  PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
345 
346  if (!superuser())
347  ereport(ERROR,
348  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
349  (errmsg("must be superuser to create subscriptions"))));
350 
352 
353  /* Check if name is used */
355  CStringGetDatum(stmt->subname));
356  if (OidIsValid(subid))
357  {
358  ereport(ERROR,
360  errmsg("subscription \"%s\" already exists",
361  stmt->subname)));
362  }
363 
364  if (!slotname_given && slotname == NULL)
365  slotname = stmt->subname;
366 
367  /* The default for synchronous_commit of subscriptions is off. */
368  if (synchronous_commit == NULL)
369  synchronous_commit = "off";
370 
371  conninfo = stmt->conninfo;
372  publications = stmt->publication;
373 
374  /* Load the library providing us libpq calls. */
375  load_file("libpqwalreceiver", false);
376 
377  /* Check the connection info string. */
378  walrcv_check_conninfo(conninfo);
379 
380  /* Everything ok, form a new tuple. */
381  memset(values, 0, sizeof(values));
382  memset(nulls, false, sizeof(nulls));
383 
385  values[Anum_pg_subscription_subname - 1] =
387  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
388  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
390  CStringGetTextDatum(conninfo);
391  if (slotname)
394  else
395  nulls[Anum_pg_subscription_subslotname - 1] = true;
397  CStringGetTextDatum(synchronous_commit);
399  publicationListToArray(publications);
400 
401  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
402 
403  /* Insert tuple into catalog. */
404  subid = CatalogTupleInsert(rel, tup);
405  heap_freetuple(tup);
406 
408 
409  snprintf(originname, sizeof(originname), "pg_%u", subid);
410  replorigin_create(originname);
411 
412  /*
413  * Connect to remote side to execute requested commands and fetch table
414  * info.
415  */
416  if (connect)
417  {
418  XLogRecPtr lsn;
419  char *err;
421  List *tables;
422  ListCell *lc;
423  char table_state;
424 
425  /* Try to connect to the publisher. */
426  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
427  if (!wrconn)
428  ereport(ERROR,
429  (errmsg("could not connect to the publisher: %s", err)));
430 
431  PG_TRY();
432  {
433  /*
434  * Set sync state based on if we were asked to do data copy or
435  * not.
436  */
437  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
438 
439  /*
440  * Get the table list from publisher and build local table status
441  * info.
442  */
443  tables = fetch_table_list(wrconn, publications);
444  foreach(lc, tables)
445  {
446  RangeVar *rv = (RangeVar *) lfirst(lc);
447  Oid relid;
448 
449  relid = RangeVarGetRelid(rv, AccessShareLock, false);
450 
451  /* Check for supported relkind. */
453  rv->schemaname, rv->relname);
454 
455  SetSubscriptionRelState(subid, relid, table_state,
456  InvalidXLogRecPtr, false);
457  }
458 
459  /*
460  * If requested, create permanent slot for the subscription. We
461  * won't use the initial snapshot for anything, so no need to
462  * export it.
463  */
464  if (create_slot)
465  {
466  Assert(slotname);
467 
468  walrcv_create_slot(wrconn, slotname, false,
469  CRS_NOEXPORT_SNAPSHOT, &lsn);
470  ereport(NOTICE,
471  (errmsg("created replication slot \"%s\" on publisher",
472  slotname)));
473  }
474  }
475  PG_CATCH();
476  {
477  /* Close the connection in case of failure. */
478  walrcv_disconnect(wrconn);
479  PG_RE_THROW();
480  }
481  PG_END_TRY();
482 
483  /* And we are done with the remote side. */
484  walrcv_disconnect(wrconn);
485  }
486  else
488  (errmsg("tables were not subscribed, you will have to run "
489  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
490  "subscribe the tables")));
491 
493 
494  if (enabled)
496 
498 
500 
501  return myself;
502 }
#define SUBREL_STATE_INIT
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:110
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
#define Anum_pg_subscription_subpublications
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
#define Anum_pg_subscription_subowner
#define RelationGetDescr(relation)
Definition: rel.h:437
Oid GetUserId(void)
Definition: miscinit.c:284
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:53
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:245
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1801
#define AccessShareLock
Definition: lockdefs.h:36
static bool create_slot
Definition: pg_basebackup.c:96
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define heap_close(r, l)
Definition: heapam.h:97
#define connect(s, name, namelen)
Definition: win32_port.h:446
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:585
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:159
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:261
#define OidIsValid(objectId)
Definition: c.h:576
#define GetSysCacheOid2(cacheId, key1, key2)
Definition: syscache.h:193
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define Natts_pg_subscription
#define Anum_pg_subscription_subname
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:242
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define Anum_pg_subscription_subslotname
#define Anum_pg_subscription_subdbid
#define WARNING
Definition: elog.h:40
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool update_only)
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
#define BoolGetDatum(X)
Definition: postgres.h:408
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:670
#define lfirst(lc)
Definition: pg_list.h:106
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:265
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:866
#define PG_TRY()
Definition: elog.h:284
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
Definition: pg_list.h:45
#define SUBREL_STATE_READY
#define Anum_pg_subscription_subsynccommit
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
#define PG_END_TRY()
Definition: elog.h:300
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3153
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:243

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 810 of file subscriptioncmds.c.

References AccessExclusiveLock, ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subname, Anum_pg_subscription_subslotname, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GetUserId(), heap_close, heap_open(), HeapTupleGetOid, HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, ObjectAddressSet, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, pg_subscription_ownercheck(), PG_TRY, PreventTransactionChain(), pstrdup(), quote_identifier(), ReleaseSysCache(), LogicalRepWorker::relid, RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2(), snprintf(), WalRcvExecResult::status, LogicalRepWorker::subid, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SubscriptionRelationId, SysCacheGetAttr(), HeapTupleData::t_self, TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

811 {
812  Relation rel;
813  ObjectAddress myself;
814  HeapTuple tup;
815  Oid subid;
816  Datum datum;
817  bool isnull;
818  char *subname;
819  char *conninfo;
820  char *slotname;
821  List *subworkers;
822  ListCell *lc;
823  char originname[NAMEDATALEN];
824  char *err = NULL;
825  RepOriginId originid;
826  WalReceiverConn *wrconn = NULL;
827  StringInfoData cmd;
828 
829  /*
830  * Lock pg_subscription with AccessExclusiveLock to ensure that the
831  * launcher doesn't restart new worker during dropping the subscription
832  */
834 
836  CStringGetDatum(stmt->subname));
837 
838  if (!HeapTupleIsValid(tup))
839  {
840  heap_close(rel, NoLock);
841 
842  if (!stmt->missing_ok)
843  ereport(ERROR,
844  (errcode(ERRCODE_UNDEFINED_OBJECT),
845  errmsg("subscription \"%s\" does not exist",
846  stmt->subname)));
847  else
848  ereport(NOTICE,
849  (errmsg("subscription \"%s\" does not exist, skipping",
850  stmt->subname)));
851 
852  return;
853  }
854 
855  subid = HeapTupleGetOid(tup);
856 
857  /* must be owner */
858  if (!pg_subscription_ownercheck(subid, GetUserId()))
860  stmt->subname);
861 
862  /* DROP hook for the subscription being removed */
864 
865  /*
866  * Lock the subscription so nobody else can do anything with it (including
867  * the replication workers).
868  */
870 
871  /* Get subname */
872  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
874  Assert(!isnull);
875  subname = pstrdup(NameStr(*DatumGetName(datum)));
876 
877  /* Get conninfo */
878  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
880  Assert(!isnull);
881  conninfo = TextDatumGetCString(datum);
882 
883  /* Get slotname */
884  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
886  if (!isnull)
887  slotname = pstrdup(NameStr(*DatumGetName(datum)));
888  else
889  slotname = NULL;
890 
891  /*
892  * Since dropping a replication slot is not transactional, the replication
893  * slot stays dropped even if the transaction rolls back. So we cannot
894  * run DROP SUBSCRIPTION inside a transaction block if dropping the
895  * replication slot.
896  *
897  * XXX The command name should really be something like "DROP SUBSCRIPTION
898  * of a subscription that is associated with a replication slot", but we
899  * don't have the proper facilities for that.
900  */
901  if (slotname)
902  PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
903 
904 
906  EventTriggerSQLDropAddObject(&myself, true, true);
907 
908  /* Remove the tuple from catalog. */
909  CatalogTupleDelete(rel, &tup->t_self);
910 
911  ReleaseSysCache(tup);
912 
913  /*
914  * Stop all the subscription workers immediately.
915  *
916  * This is necessary if we are dropping the replication slot, so that the
917  * slot becomes accessible.
918  *
919  * It is also necessary if the subscription is disabled and was disabled
920  * in the same transaction. Then the workers haven't seen the disabling
921  * yet and will still be running, leading to hangs later when we want to
922  * drop the replication origin. If the subscription was disabled before
923  * this transaction, then there shouldn't be any workers left, so this
924  * won't make a difference.
925  *
926  * New workers won't be started because we hold an exclusive lock on the
927  * subscription till the end of the transaction.
928  */
929  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
930  subworkers = logicalrep_workers_find(subid, false);
931  LWLockRelease(LogicalRepWorkerLock);
932  foreach(lc, subworkers)
933  {
935 
937  }
938  list_free(subworkers);
939 
940  /* Clean up dependencies */
942 
943  /* Remove any associated relation synchronization states. */
945 
946  /* Remove the origin tracking if exists. */
947  snprintf(originname, sizeof(originname), "pg_%u", subid);
948  originid = replorigin_by_name(originname, true);
949  if (originid != InvalidRepOriginId)
950  replorigin_drop(originid, false);
951 
952  /*
953  * If there is no slot associated with the subscription, we can finish
954  * here.
955  */
956  if (!slotname)
957  {
958  heap_close(rel, NoLock);
959  return;
960  }
961 
962  /*
963  * Otherwise drop the replication slot at the publisher node using the
964  * replication connection.
965  */
966  load_file("libpqwalreceiver", false);
967 
968  initStringInfo(&cmd);
969  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
970 
971  wrconn = walrcv_connect(conninfo, true, subname, &err);
972  if (wrconn == NULL)
973  ereport(ERROR,
974  (errmsg("could not connect to publisher when attempting to "
975  "drop the replication slot \"%s\"", slotname),
976  errdetail("The error was: %s", err),
977  errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
978  "to disassociate the subscription from the slot.")));
979 
980  PG_TRY();
981  {
982  WalRcvExecResult *res;
983 
984  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
985 
986  if (res->status != WALRCV_OK_COMMAND)
987  ereport(ERROR,
988  (errmsg("could not drop the replication slot \"%s\" on publisher",
989  slotname),
990  errdetail("The error was: %s", res->err)));
991  else
992  ereport(NOTICE,
993  (errmsg("dropped replication slot \"%s\" on publisher",
994  slotname)));
995 
996  walrcv_clear_result(res);
997  }
998  PG_CATCH();
999  {
1000  /* Close the connection in case of failure */
1001  walrcv_disconnect(wrconn);
1002  PG_RE_THROW();
1003  }
1004  PG_END_TRY();
1005 
1006  walrcv_disconnect(wrconn);
1007 
1008  pfree(cmd.data);
1009 
1010  heap_close(rel, NoLock);
1011 }
WalReceiverConn * wrconn
Definition: worker.c:110
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:262
int errhint(const char *fmt,...)
Definition: elog.c:987
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10406
Oid GetUserId(void)
Definition: miscinit.c:284
char * pstrdup(const char *in)
Definition: mcxt.c:1076
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:334
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:154
uint16 RepOriginId
Definition: xlogdefs.h:51
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
#define heap_close(r, l)
Definition: heapam.h:97
unsigned int Oid
Definition: postgres_ext.h:31
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:591
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:269
void pfree(void *pointer)
Definition: mcxt.c:949
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
#define Anum_pg_subscription_subname
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
#define NoLock
Definition: lockdefs.h:34
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:455
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define CStringGetDatum(X)
Definition: postgres.h:584
#define ereport(elevel, rest)
Definition: elog.h:122
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:823
#define Anum_pg_subscription_subslotname
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define TextDatumGetCString(d)
Definition: builtins.h:92
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1368
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define InvalidOid
Definition: postgres_ext.h:36
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define Assert(condition)
Definition: c.h:670
#define lfirst(lc)
Definition: pg_list.h:106
WalRcvExecStatus status
Definition: walreceiver.h:188
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1123
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:265
#define InvalidRepOriginId
Definition: origin.h:34
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:797
void list_free(List *list)
Definition: list.c:1133
#define NameStr(name)
Definition: c.h:547
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:700
#define PG_TRY()
Definition: elog.h:284
Definition: pg_list.h:45
#define PG_END_TRY()
Definition: elog.h:300
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:263
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3153
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:243

◆ fetch_table_list()

static List * fetch_table_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 1114 of file subscriptioncmds.c.

References appendStringInfoChar(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), initStringInfo(), lappend(), lfirst, list_length(), makeRangeVar(), MakeSingleTupleTableSlot(), NIL, pfree(), pstrdup(), quote_literal_cstr(), slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TEXTOID, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, and WALRCV_OK_TUPLES.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

1115 {
1116  WalRcvExecResult *res;
1117  StringInfoData cmd;
1118  TupleTableSlot *slot;
1119  Oid tableRow[2] = {TEXTOID, TEXTOID};
1120  ListCell *lc;
1121  bool first;
1122  List *tablelist = NIL;
1123 
1124  Assert(list_length(publications) > 0);
1125 
1126  initStringInfo(&cmd);
1127  appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1128  " FROM pg_catalog.pg_publication_tables t\n"
1129  " WHERE t.pubname IN (");
1130  first = true;
1131  foreach(lc, publications)
1132  {
1133  char *pubname = strVal(lfirst(lc));
1134 
1135  if (first)
1136  first = false;
1137  else
1138  appendStringInfoString(&cmd, ", ");
1139 
1141  }
1142  appendStringInfoChar(&cmd, ')');
1143 
1144  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1145  pfree(cmd.data);
1146 
1147  if (res->status != WALRCV_OK_TUPLES)
1148  ereport(ERROR,
1149  (errmsg("could not receive list of replicated tables from the publisher: %s",
1150  res->err)));
1151 
1152  /* Process tables. */
1153  slot = MakeSingleTupleTableSlot(res->tupledesc);
1154  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1155  {
1156  char *nspname;
1157  char *relname;
1158  bool isnull;
1159  RangeVar *rv;
1160 
1161  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1162  Assert(!isnull);
1163  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1164  Assert(!isnull);
1165 
1166  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1167  tablelist = lappend(tablelist, rv);
1168 
1169  ExecClearTuple(slot);
1170  }
1172 
1173  walrcv_clear_result(res);
1174 
1175  return tablelist;
1176 }
#define NIL
Definition: pg_list.h:69
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define TEXTOID
Definition: pg_type.h:324
char * pstrdup(const char *in)
Definition: mcxt.c:1076
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
#define strVal(v)
Definition: value.h:54
unsigned int Oid
Definition: postgres_ext.h:31
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:269
void pfree(void *pointer)
Definition: mcxt.c:949
TupleDesc tupledesc
Definition: walreceiver.h:191
#define ERROR
Definition: elog.h:43
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:216
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc)
Definition: execTuples.c:199
#define ereport(elevel, rest)
Definition: elog.h:122
List * lappend(List *list, void *datum)
Definition: list.c:128
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define TextDatumGetCString(d)
Definition: builtins.h:92
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
Tuplestorestate * tuplestore
Definition: walreceiver.h:190
#define Assert(condition)
Definition: c.h:670
#define lfirst(lc)
Definition: pg_list.h:106
WalRcvExecStatus status
Definition: walreceiver.h:188
static int list_length(const List *l)
Definition: pg_list.h:89
int errmsg(const char *fmt,...)
Definition: elog.c:797
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1142
Definition: pg_list.h:45
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:421
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:263

◆ parse_subscription_options()

static void parse_subscription_options ( List options,
bool connect,
bool enabled_given,
bool enabled,
bool create_slot,
bool slot_name_given,
char **  slot_name,
bool copy_data,
char **  synchronous_commit,
bool refresh 
)
static

Definition at line 64 of file subscriptioncmds.c.

References Assert, connect, defGetBoolean(), defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, GUC_ACTION_SET, lfirst, PGC_BACKEND, PGC_S_TEST, and set_config_option().

Referenced by AlterSubscription(), and CreateSubscription().

69 {
70  ListCell *lc;
71  bool connect_given = false;
72  bool create_slot_given = false;
73  bool copy_data_given = false;
74  bool refresh_given = false;
75 
76  /* If connect is specified, the others also need to be. */
77  Assert(!connect || (enabled && create_slot && copy_data));
78 
79  if (connect)
80  *connect = true;
81  if (enabled)
82  {
83  *enabled_given = false;
84  *enabled = true;
85  }
86  if (create_slot)
87  *create_slot = true;
88  if (slot_name)
89  {
90  *slot_name_given = false;
91  *slot_name = NULL;
92  }
93  if (copy_data)
94  *copy_data = true;
96  *synchronous_commit = NULL;
97  if (refresh)
98  *refresh = true;
99 
100  /* Parse options */
101  foreach(lc, options)
102  {
103  DefElem *defel = (DefElem *) lfirst(lc);
104 
105  if (strcmp(defel->defname, "connect") == 0 && connect)
106  {
107  if (connect_given)
108  ereport(ERROR,
109  (errcode(ERRCODE_SYNTAX_ERROR),
110  errmsg("conflicting or redundant options")));
111 
112  connect_given = true;
113  *connect = defGetBoolean(defel);
114  }
115  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
116  {
117  if (*enabled_given)
118  ereport(ERROR,
119  (errcode(ERRCODE_SYNTAX_ERROR),
120  errmsg("conflicting or redundant options")));
121 
122  *enabled_given = true;
123  *enabled = defGetBoolean(defel);
124  }
125  else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
126  {
127  if (create_slot_given)
128  ereport(ERROR,
129  (errcode(ERRCODE_SYNTAX_ERROR),
130  errmsg("conflicting or redundant options")));
131 
132  create_slot_given = true;
133  *create_slot = defGetBoolean(defel);
134  }
135  else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
136  {
137  if (*slot_name_given)
138  ereport(ERROR,
139  (errcode(ERRCODE_SYNTAX_ERROR),
140  errmsg("conflicting or redundant options")));
141 
142  *slot_name_given = true;
143  *slot_name = defGetString(defel);
144 
145  /* Setting slot_name = NONE is treated as no slot name. */
146  if (strcmp(*slot_name, "none") == 0)
147  *slot_name = NULL;
148  }
149  else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
150  {
151  if (copy_data_given)
152  ereport(ERROR,
153  (errcode(ERRCODE_SYNTAX_ERROR),
154  errmsg("conflicting or redundant options")));
155 
156  copy_data_given = true;
157  *copy_data = defGetBoolean(defel);
158  }
159  else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
161  {
162  if (*synchronous_commit)
163  ereport(ERROR,
164  (errcode(ERRCODE_SYNTAX_ERROR),
165  errmsg("conflicting or redundant options")));
166 
168 
169  /* Test if the given value is valid for synchronous_commit GUC. */
170  (void) set_config_option("synchronous_commit", *synchronous_commit,
172  false, 0, false);
173  }
174  else if (strcmp(defel->defname, "refresh") == 0 && refresh)
175  {
176  if (refresh_given)
177  ereport(ERROR,
178  (errcode(ERRCODE_SYNTAX_ERROR),
179  errmsg("conflicting or redundant options")));
180 
181  refresh_given = true;
182  *refresh = defGetBoolean(defel);
183  }
184  else
185  ereport(ERROR,
186  (errcode(ERRCODE_SYNTAX_ERROR),
187  errmsg("unrecognized subscription parameter: %s", defel->defname)));
188  }
189 
190  /*
191  * We've been explicitly asked to not connect, that requires some
192  * additional processing.
193  */
194  if (connect && !*connect)
195  {
196  /* Check for incompatible options from the user. */
197  if (enabled && *enabled_given && *enabled)
198  ereport(ERROR,
199  (errcode(ERRCODE_SYNTAX_ERROR),
200  errmsg("connect = false and enabled = true are mutually exclusive options")));
201 
202  if (create_slot && create_slot_given && *create_slot)
203  ereport(ERROR,
204  (errcode(ERRCODE_SYNTAX_ERROR),
205  errmsg("connect = false and create_slot = true are mutually exclusive options")));
206 
207  if (copy_data && copy_data_given && *copy_data)
208  ereport(ERROR,
209  (errcode(ERRCODE_SYNTAX_ERROR),
210  errmsg("connect = false and copy_data = true are mutually exclusive options")));
211 
212  /* Change the defaults of other options. */
213  *enabled = false;
214  *create_slot = false;
215  *copy_data = false;
216  }
217 
218  /*
219  * Do additional checking for disallowed combination when slot_name = NONE
220  * was used.
221  */
222  if (slot_name && *slot_name_given && !*slot_name)
223  {
224  if (enabled && *enabled_given && *enabled)
225  ereport(ERROR,
226  (errcode(ERRCODE_SYNTAX_ERROR),
227  errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
228 
229  if (create_slot && create_slot_given && *create_slot)
230  ereport(ERROR,
231  (errcode(ERRCODE_SYNTAX_ERROR),
232  errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
233 
234  if (enabled && !*enabled_given && *enabled)
235  ereport(ERROR,
236  (errcode(ERRCODE_SYNTAX_ERROR),
237  errmsg("subscription with slot_name = NONE must also set enabled = false")));
238 
239  if (create_slot && !create_slot_given && *create_slot)
240  ereport(ERROR,
241  (errcode(ERRCODE_SYNTAX_ERROR),
242  errmsg("subscription with slot_name = NONE must also set create_slot = false")));
243  }
244 }
static bool create_slot
Definition: pg_basebackup.c:96
int errcode(int sqlerrcode)
Definition: elog.c:575
#define connect(s, name, namelen)
Definition: win32_port.h:446
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define Assert(condition)
Definition: c.h:670
#define lfirst(lc)
Definition: pg_list.h:106
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * defname
Definition: parsenodes.h:719
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:5927

◆ publicationListToArray()

static Datum publicationListToArray ( List publist)
static

Definition at line 250 of file subscriptioncmds.c.

References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, AllocSetContextCreate(), construct_array(), CStringGetTextDatum, CurrentMemoryContext, ereport, errcode(), errmsg(), ERROR, lfirst, list_length(), MemoryContextDelete(), MemoryContextSwitchTo(), name, palloc(), PointerGetDatum, strVal, and TEXTOID.

Referenced by AlterSubscription(), and CreateSubscription().

251 {
252  ArrayType *arr;
253  Datum *datums;
254  int j = 0;
255  ListCell *cell;
256  MemoryContext memcxt;
257  MemoryContext oldcxt;
258 
259  /* Create memory context for temporary allocations. */
261  "publicationListToArray to array",
265  oldcxt = MemoryContextSwitchTo(memcxt);
266 
267  datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
268 
269  foreach(cell, publist)
270  {
271  char *name = strVal(lfirst(cell));
272  ListCell *pcell;
273 
274  /* Check for duplicates. */
275  foreach(pcell, publist)
276  {
277  char *pname = strVal(lfirst(pcell));
278 
279  if (pcell == cell)
280  break;
281 
282  if (strcmp(name, pname) == 0)
283  ereport(ERROR,
284  (errcode(ERRCODE_SYNTAX_ERROR),
285  errmsg("publication name \"%s\" used more than once",
286  pname)));
287  }
288 
289  datums[j++] = CStringGetTextDatum(name);
290  }
291 
292  MemoryContextSwitchTo(oldcxt);
293 
294  arr = construct_array(datums, list_length(publist),
295  TEXTOID, -1, false, 'i');
296 
297  MemoryContextDelete(memcxt);
298 
299  return PointerGetDatum(arr);
300 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
#define TEXTOID
Definition: pg_type.h:324
#define PointerGetDatum(X)
Definition: postgres.h:562
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3279
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
#define ERROR
Definition: elog.h:43
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
uintptr_t Datum
Definition: postgres.h:372
#define lfirst(lc)
Definition: pg_list.h:106
static int list_length(const List *l)
Definition: pg_list.h:89
const char * name
Definition: encode.c:521
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164