PostgreSQL Source Code  git master
subscriptioncmds.c File Reference
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Functions

static Listfetch_table_list (WalReceiverConn *wrconn, List *publications)
 
static void parse_subscription_options (List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 649 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), MyDatabaseId, namein(), OBJECT_SUBSCRIPTION, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, synchronous_commit, HeapTupleData::t_self, table_close(), table_open(), values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

650 {
651  Relation rel;
652  ObjectAddress myself;
653  bool nulls[Natts_pg_subscription];
654  bool replaces[Natts_pg_subscription];
655  Datum values[Natts_pg_subscription];
656  HeapTuple tup;
657  Oid subid;
658  bool update_tuple = false;
659  Subscription *sub;
661 
662  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
663 
664  /* Fetch the existing tuple. */
666  CStringGetDatum(stmt->subname));
667 
668  if (!HeapTupleIsValid(tup))
669  ereport(ERROR,
670  (errcode(ERRCODE_UNDEFINED_OBJECT),
671  errmsg("subscription \"%s\" does not exist",
672  stmt->subname)));
673 
674  form = (Form_pg_subscription) GETSTRUCT(tup);
675  subid = form->oid;
676 
677  /* must be owner */
678  if (!pg_subscription_ownercheck(subid, GetUserId()))
680  stmt->subname);
681 
682  sub = GetSubscription(subid, false);
683 
684  /* Lock the subscription so nobody else can do anything with it. */
685  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
686 
687  /* Form a new tuple. */
688  memset(values, 0, sizeof(values));
689  memset(nulls, false, sizeof(nulls));
690  memset(replaces, false, sizeof(replaces));
691 
692  switch (stmt->kind)
693  {
695  {
696  char *slotname;
697  bool slotname_given;
698  char *synchronous_commit;
699  bool binary_given;
700  bool binary;
701 
703  NULL, /* no "connect" */
704  NULL, NULL, /* no "enabled" */
705  NULL, /* no "create_slot" */
706  &slotname_given, &slotname,
707  NULL, /* no "copy_data" */
708  &synchronous_commit,
709  NULL, /* no "refresh" */
710  &binary_given, &binary);
711 
712  if (slotname_given)
713  {
714  if (sub->enabled && !slotname)
715  ereport(ERROR,
716  (errcode(ERRCODE_SYNTAX_ERROR),
717  errmsg("cannot set %s for enabled subscription",
718  "slot_name = NONE")));
719 
720  if (slotname)
721  values[Anum_pg_subscription_subslotname - 1] =
723  else
724  nulls[Anum_pg_subscription_subslotname - 1] = true;
725  replaces[Anum_pg_subscription_subslotname - 1] = true;
726  }
727 
728  if (synchronous_commit)
729  {
730  values[Anum_pg_subscription_subsynccommit - 1] =
731  CStringGetTextDatum(synchronous_commit);
732  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
733  }
734 
735  if (binary_given)
736  {
737  values[Anum_pg_subscription_subbinary - 1] =
738  BoolGetDatum(binary);
739  replaces[Anum_pg_subscription_subbinary - 1] = true;
740  }
741 
742  update_tuple = true;
743  break;
744  }
745 
747  {
748  bool enabled,
749  enabled_given;
750 
752  NULL, /* no "connect" */
753  &enabled_given, &enabled,
754  NULL, /* no "create_slot" */
755  NULL, NULL, /* no "slot_name" */
756  NULL, /* no "copy_data" */
757  NULL, /* no "synchronous_commit" */
758  NULL, /* no "refresh" */
759  NULL, NULL); /* no "binary" */
760  Assert(enabled_given);
761 
762  if (!sub->slotname && enabled)
763  ereport(ERROR,
764  (errcode(ERRCODE_SYNTAX_ERROR),
765  errmsg("cannot enable subscription that does not have a slot name")));
766 
767  values[Anum_pg_subscription_subenabled - 1] =
768  BoolGetDatum(enabled);
769  replaces[Anum_pg_subscription_subenabled - 1] = true;
770 
771  if (enabled)
773 
774  update_tuple = true;
775  break;
776  }
777 
779  /* Load the library providing us libpq calls. */
780  load_file("libpqwalreceiver", false);
781  /* Check the connection info string. */
783 
784  values[Anum_pg_subscription_subconninfo - 1] =
786  replaces[Anum_pg_subscription_subconninfo - 1] = true;
787  update_tuple = true;
788  break;
789 
791  {
792  bool copy_data;
793  bool refresh;
794 
796  NULL, /* no "connect" */
797  NULL, NULL, /* no "enabled" */
798  NULL, /* no "create_slot" */
799  NULL, NULL, /* no "slot_name" */
800  &copy_data,
801  NULL, /* no "synchronous_commit" */
802  &refresh,
803  NULL, NULL); /* no "binary" */
804 
805  values[Anum_pg_subscription_subpublications - 1] =
807  replaces[Anum_pg_subscription_subpublications - 1] = true;
808 
809  update_tuple = true;
810 
811  /* Refresh if user asked us to. */
812  if (refresh)
813  {
814  if (!sub->enabled)
815  ereport(ERROR,
816  (errcode(ERRCODE_SYNTAX_ERROR),
817  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
818  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
819 
820  /* Make sure refresh sees the new list of publications. */
821  sub->publications = stmt->publication;
822 
823  AlterSubscription_refresh(sub, copy_data);
824  }
825 
826  break;
827  }
828 
830  {
831  bool copy_data;
832 
833  if (!sub->enabled)
834  ereport(ERROR,
835  (errcode(ERRCODE_SYNTAX_ERROR),
836  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
837 
839  NULL, /* no "connect" */
840  NULL, NULL, /* no "enabled" */
841  NULL, /* no "create_slot" */
842  NULL, NULL, /* no "slot_name" */
843  &copy_data,
844  NULL, /* no "synchronous_commit" */
845  NULL, /* no "refresh" */
846  NULL, NULL); /* no "binary" */
847 
848  AlterSubscription_refresh(sub, copy_data);
849 
850  break;
851  }
852 
853  default:
854  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
855  stmt->kind);
856  }
857 
858  /* Update the catalog if needed. */
859  if (update_tuple)
860  {
861  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
862  replaces);
863 
864  CatalogTupleUpdate(rel, &tup->t_self, tup);
865 
866  heap_freetuple(tup);
867  }
868 
870 
871  ObjectAddressSet(myself, SubscriptionRelationId, subid);
872 
873  InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
874 
875  return myself;
876 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:482
Oid GetUserId(void)
Definition: miscinit.c:450
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:399
int errcode(int sqlerrcode)
Definition: elog.c:610
static Datum publicationListToArray(List *publist)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
AlterSubscriptionType kind
Definition: parsenodes.h:3558
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
List * publications
int synchronous_commit
Definition: xact.c:83
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
uintptr_t Datum
Definition: postgres.h:367
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary)
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:745
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:86
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:929
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5251

◆ AlterSubscription_refresh()

static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data 
)
static

Definition at line 540 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), CheckSubscriptionRelkind(), Subscription::conninfo, DEBUG1, ereport, errmsg(), ERROR, fetch_table_list(), get_namespace_name(), get_rel_name(), get_rel_namespace(), get_rel_relkind(), GetSubscriptionRelations(), InvalidXLogRecPtr, lfirst, list_length(), load_file(), logicalrep_worker_stop_at_commit(), Subscription::name, Subscription::oid, oid_cmp(), palloc(), Subscription::publications, qsort, RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), RangeVar::schemaname, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

541 {
542  char *err;
543  List *pubrel_names;
544  List *subrel_states;
545  Oid *subrel_local_oids;
546  Oid *pubrel_local_oids;
547  ListCell *lc;
548  int off;
549 
550  /* Load the library providing us libpq calls. */
551  load_file("libpqwalreceiver", false);
552 
553  /* Try to connect to the publisher. */
554  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
555  if (!wrconn)
556  ereport(ERROR,
557  (errmsg("could not connect to the publisher: %s", err)));
558 
559  /* Get the table list from publisher. */
560  pubrel_names = fetch_table_list(wrconn, sub->publications);
561 
562  /* We are done with the remote side, close connection. */
564 
565  /* Get local table list. */
566  subrel_states = GetSubscriptionRelations(sub->oid);
567 
568  /*
569  * Build qsorted array of local table oids for faster lookup. This can
570  * potentially contain all tables in the database so speed of lookup is
571  * important.
572  */
573  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
574  off = 0;
575  foreach(lc, subrel_states)
576  {
578 
579  subrel_local_oids[off++] = relstate->relid;
580  }
581  qsort(subrel_local_oids, list_length(subrel_states),
582  sizeof(Oid), oid_cmp);
583 
584  /*
585  * Walk over the remote tables and try to match them to locally known
586  * tables. If the table is not known locally create a new state for it.
587  *
588  * Also builds array of local oids of remote tables for the next step.
589  */
590  off = 0;
591  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
592 
593  foreach(lc, pubrel_names)
594  {
595  RangeVar *rv = (RangeVar *) lfirst(lc);
596  Oid relid;
597 
598  relid = RangeVarGetRelid(rv, AccessShareLock, false);
599 
600  /* Check for supported relkind. */
602  rv->schemaname, rv->relname);
603 
604  pubrel_local_oids[off++] = relid;
605 
606  if (!bsearch(&relid, subrel_local_oids,
607  list_length(subrel_states), sizeof(Oid), oid_cmp))
608  {
609  AddSubscriptionRelState(sub->oid, relid,
610  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
612  ereport(DEBUG1,
613  (errmsg("table \"%s.%s\" added to subscription \"%s\"",
614  rv->schemaname, rv->relname, sub->name)));
615  }
616  }
617 
618  /*
619  * Next remove state for tables we should not care about anymore using the
620  * data we collected above
621  */
622  qsort(pubrel_local_oids, list_length(pubrel_names),
623  sizeof(Oid), oid_cmp);
624 
625  for (off = 0; off < list_length(subrel_states); off++)
626  {
627  Oid relid = subrel_local_oids[off];
628 
629  if (!bsearch(&relid, pubrel_local_oids,
630  list_length(pubrel_names), sizeof(Oid), oid_cmp))
631  {
632  RemoveSubscriptionRel(sub->oid, relid);
633 
635 
636  ereport(DEBUG1,
637  (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
639  get_rel_name(relid),
640  sub->name)));
641  }
642  }
643 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:105
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define DEBUG1
Definition: elog.h:25
void RemoveSubscriptionRel(Oid subid, Oid relid)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:78
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1915
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1864
#define AccessShareLock
Definition: lockdefs.h:36
unsigned int Oid
Definition: postgres_ext.h:31
char * schemaname
Definition: primnodes.h:67
char * relname
Definition: primnodes.h:68
#define ERROR
Definition: elog.h:43
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3191
List * GetSubscriptionRelations(Oid subid)
List * publications
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:190
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
static int list_length(const List *l)
Definition: pg_list.h:169
#define walrcv_disconnect(conn)
Definition: walreceiver.h:425
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
Definition: launcher.c:549
#define qsort(a, b, c, d)
Definition: port.h:479
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1840
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:397

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1124 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

1125 {
1126  Oid subid;
1127  HeapTuple tup;
1128  Relation rel;
1129  ObjectAddress address;
1130  Form_pg_subscription form;
1131 
1132  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1133 
1136 
1137  if (!HeapTupleIsValid(tup))
1138  ereport(ERROR,
1139  (errcode(ERRCODE_UNDEFINED_OBJECT),
1140  errmsg("subscription \"%s\" does not exist", name)));
1141 
1142  form = (Form_pg_subscription) GETSTRUCT(tup);
1143  subid = form->oid;
1144 
1145  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1146 
1147  ObjectAddressSet(address, SubscriptionRelationId, subid);
1148 
1149  heap_freetuple(tup);
1150 
1152 
1153  return address;
1154 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:610
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
Oid MyDatabaseId
Definition: globals.c:85
#define ereport(elevel,...)
Definition: elog.h:144
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:561
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterSubscriptionOwner_internal()

static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 1087 of file subscriptioncmds.c.

References aclcheck_error(), ACLCHECK_NOT_OWNER, CatalogTupleUpdate(), changeDependencyOnOwner(), ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetUserId(), InvokeObjectPostAlterHook, NameStr, OBJECT_SUBSCRIPTION, pg_subscription_ownercheck(), superuser_arg(), and HeapTupleData::t_self.

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

1088 {
1089  Form_pg_subscription form;
1090 
1091  form = (Form_pg_subscription) GETSTRUCT(tup);
1092 
1093  if (form->subowner == newOwnerId)
1094  return;
1095 
1096  if (!pg_subscription_ownercheck(form->oid, GetUserId()))
1098  NameStr(form->subname));
1099 
1100  /* New owner must be a superuser */
1101  if (!superuser_arg(newOwnerId))
1102  ereport(ERROR,
1103  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1104  errmsg("permission denied to change owner of subscription \"%s\"",
1105  NameStr(form->subname)),
1106  errhint("The owner of a subscription must be a superuser.")));
1107 
1108  form->subowner = newOwnerId;
1109  CatalogTupleUpdate(rel, &tup->t_self, tup);
1110 
1111  /* Update owner dependency reference */
1112  changeDependencyOnOwner(SubscriptionRelationId,
1113  form->oid,
1114  newOwnerId);
1115 
1116  InvokeObjectPostAlterHook(SubscriptionRelationId,
1117  form->oid, 0);
1118 }
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
Oid GetUserId(void)
Definition: miscinit.c:450
int errcode(int sqlerrcode)
Definition: elog.c:610
FormData_pg_subscription * Form_pg_subscription
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:309
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
#define ereport(elevel,...)
Definition: elog.h:144
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define NameStr(name)
Definition: c.h:622
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5251

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1160 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, table_close(), and table_open().

Referenced by shdepReassignOwned().

1161 {
1162  HeapTuple tup;
1163  Relation rel;
1164 
1165  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1166 
1168 
1169  if (!HeapTupleIsValid(tup))
1170  ereport(ERROR,
1171  (errcode(ERRCODE_UNDEFINED_OBJECT),
1172  errmsg("subscription with OID %u does not exist", subid)));
1173 
1174  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1175 
1176  heap_freetuple(tup);
1177 
1179 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel,...)
Definition: elog.h:144
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ CreateSubscription()

ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 327 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, create_slot, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), heap_form_tuple(), heap_freetuple(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, PreventInTransactionBlock(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, snprintf, CreateSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionObjectIndexId, superuser(), synchronous_commit, table_close(), table_open(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

328 {
329  Relation rel;
330  ObjectAddress myself;
331  Oid subid;
332  bool nulls[Natts_pg_subscription];
333  Datum values[Natts_pg_subscription];
334  Oid owner = GetUserId();
335  HeapTuple tup;
336  bool connect;
337  bool enabled_given;
338  bool enabled;
339  bool copy_data;
340  char *synchronous_commit;
341  char *conninfo;
342  char *slotname;
343  bool slotname_given;
344  bool binary;
345  bool binary_given;
346  char originname[NAMEDATALEN];
347  bool create_slot;
348  List *publications;
349 
350  /*
351  * Parse and check options.
352  *
353  * Connection and publication should not be specified here.
354  */
356  &connect,
357  &enabled_given, &enabled,
358  &create_slot,
359  &slotname_given, &slotname,
360  &copy_data,
361  &synchronous_commit,
362  NULL, /* no "refresh" */
363  &binary_given, &binary);
364 
365  /*
366  * Since creating a replication slot is not transactional, rolling back
367  * the transaction leaves the created replication slot. So we cannot run
368  * CREATE SUBSCRIPTION inside a transaction block if creating a
369  * replication slot.
370  */
371  if (create_slot)
372  PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
373 
374  if (!superuser())
375  ereport(ERROR,
376  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
377  errmsg("must be superuser to create subscriptions")));
378 
379  /*
380  * If built with appropriate switch, whine when regression-testing
381  * conventions for subscription names are violated.
382  */
383 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
384  if (strncmp(stmt->subname, "regress_", 8) != 0)
385  elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
386 #endif
387 
388  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
389 
390  /* Check if name is used */
391  subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
393  if (OidIsValid(subid))
394  {
395  ereport(ERROR,
397  errmsg("subscription \"%s\" already exists",
398  stmt->subname)));
399  }
400 
401  if (!slotname_given && slotname == NULL)
402  slotname = stmt->subname;
403 
404  /* The default for synchronous_commit of subscriptions is off. */
405  if (synchronous_commit == NULL)
406  synchronous_commit = "off";
407 
408  conninfo = stmt->conninfo;
409  publications = stmt->publication;
410 
411  /* Load the library providing us libpq calls. */
412  load_file("libpqwalreceiver", false);
413 
414  /* Check the connection info string. */
415  walrcv_check_conninfo(conninfo);
416 
417  /* Everything ok, form a new tuple. */
418  memset(values, 0, sizeof(values));
419  memset(nulls, false, sizeof(nulls));
420 
422  Anum_pg_subscription_oid);
423  values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
424  values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
425  values[Anum_pg_subscription_subname - 1] =
427  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
428  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
429  values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
430  values[Anum_pg_subscription_subconninfo - 1] =
431  CStringGetTextDatum(conninfo);
432  if (slotname)
433  values[Anum_pg_subscription_subslotname - 1] =
435  else
436  nulls[Anum_pg_subscription_subslotname - 1] = true;
437  values[Anum_pg_subscription_subsynccommit - 1] =
438  CStringGetTextDatum(synchronous_commit);
439  values[Anum_pg_subscription_subpublications - 1] =
440  publicationListToArray(publications);
441 
442  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
443 
444  /* Insert tuple into catalog. */
445  CatalogTupleInsert(rel, tup);
446  heap_freetuple(tup);
447 
448  recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
449 
450  snprintf(originname, sizeof(originname), "pg_%u", subid);
451  replorigin_create(originname);
452 
453  /*
454  * Connect to remote side to execute requested commands and fetch table
455  * info.
456  */
457  if (connect)
458  {
459  char *err;
461  List *tables;
462  ListCell *lc;
463  char table_state;
464 
465  /* Try to connect to the publisher. */
466  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
467  if (!wrconn)
468  ereport(ERROR,
469  (errmsg("could not connect to the publisher: %s", err)));
470 
471  PG_TRY();
472  {
473  /*
474  * Set sync state based on if we were asked to do data copy or
475  * not.
476  */
477  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
478 
479  /*
480  * Get the table list from publisher and build local table status
481  * info.
482  */
483  tables = fetch_table_list(wrconn, publications);
484  foreach(lc, tables)
485  {
486  RangeVar *rv = (RangeVar *) lfirst(lc);
487  Oid relid;
488 
489  relid = RangeVarGetRelid(rv, AccessShareLock, false);
490 
491  /* Check for supported relkind. */
493  rv->schemaname, rv->relname);
494 
495  AddSubscriptionRelState(subid, relid, table_state,
497  }
498 
499  /*
500  * If requested, create permanent slot for the subscription. We
501  * won't use the initial snapshot for anything, so no need to
502  * export it.
503  */
504  if (create_slot)
505  {
506  Assert(slotname);
507 
508  walrcv_create_slot(wrconn, slotname, false,
509  CRS_NOEXPORT_SNAPSHOT, NULL);
510  ereport(NOTICE,
511  (errmsg("created replication slot \"%s\" on publisher",
512  slotname)));
513  }
514  }
515  PG_FINALLY();
516  {
517  walrcv_disconnect(wrconn);
518  }
519  PG_END_TRY();
520  }
521  else
523  /* translator: %s is an SQL ALTER statement */
524  (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
525  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
526 
528 
529  if (enabled)
531 
532  ObjectAddressSet(myself, SubscriptionRelationId, subid);
533 
534  InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
535 
536  return myself;
537 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:317
WalReceiverConn * wrconn
Definition: worker.c:105
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:482
Oid GetUserId(void)
Definition: miscinit.c:450
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:78
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:399
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1915
#define AccessShareLock
Definition: lockdefs.h:36
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define connect(s, name, namelen)
Definition: win32_port.h:435
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:419
#define OidIsValid(objectId)
Definition: c.h:651
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
int synchronous_commit
Definition: xact.c:83
#define WARNING
Definition: elog.h:40
#define PG_FINALLY()
Definition: elog.h:312
uintptr_t Datum
Definition: postgres.h:367
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary)
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
#define NOTICE
Definition: elog.h:37
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define walrcv_disconnect(conn)
Definition: walreceiver.h:425
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:86
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:929
#define PG_TRY()
Definition: elog.h:295
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
#define PG_END_TRY()
Definition: elog.h:320
#define SubscriptionObjectIndexId
Definition: indexing.h:363
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:397

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 882 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GETSTRUCT, GetUserId(), HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, OBJECT_SUBSCRIPTION, ObjectAddressSet, pfree(), PG_END_TRY, PG_FINALLY, pg_subscription_ownercheck(), PG_TRY, PreventInTransactionBlock(), pstrdup(), quote_identifier(), ReleaseSysCache(), LogicalRepWorker::relid, RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2(), snprintf, WalRcvExecResult::status, LogicalRepWorker::subid, subname, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SysCacheGetAttr(), HeapTupleData::t_self, table_close(), table_open(), TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

883 {
884  Relation rel;
885  ObjectAddress myself;
886  HeapTuple tup;
887  Oid subid;
888  Datum datum;
889  bool isnull;
890  char *subname;
891  char *conninfo;
892  char *slotname;
893  List *subworkers;
894  ListCell *lc;
895  char originname[NAMEDATALEN];
896  char *err = NULL;
897  RepOriginId originid;
898  WalReceiverConn *wrconn = NULL;
899  StringInfoData cmd;
901 
902  /*
903  * Lock pg_subscription with AccessExclusiveLock to ensure that the
904  * launcher doesn't restart new worker during dropping the subscription
905  */
906  rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
907 
909  CStringGetDatum(stmt->subname));
910 
911  if (!HeapTupleIsValid(tup))
912  {
913  table_close(rel, NoLock);
914 
915  if (!stmt->missing_ok)
916  ereport(ERROR,
917  (errcode(ERRCODE_UNDEFINED_OBJECT),
918  errmsg("subscription \"%s\" does not exist",
919  stmt->subname)));
920  else
921  ereport(NOTICE,
922  (errmsg("subscription \"%s\" does not exist, skipping",
923  stmt->subname)));
924 
925  return;
926  }
927 
928  form = (Form_pg_subscription) GETSTRUCT(tup);
929  subid = form->oid;
930 
931  /* must be owner */
932  if (!pg_subscription_ownercheck(subid, GetUserId()))
934  stmt->subname);
935 
936  /* DROP hook for the subscription being removed */
937  InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
938 
939  /*
940  * Lock the subscription so nobody else can do anything with it (including
941  * the replication workers).
942  */
943  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
944 
945  /* Get subname */
946  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
947  Anum_pg_subscription_subname, &isnull);
948  Assert(!isnull);
949  subname = pstrdup(NameStr(*DatumGetName(datum)));
950 
951  /* Get conninfo */
952  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
953  Anum_pg_subscription_subconninfo, &isnull);
954  Assert(!isnull);
955  conninfo = TextDatumGetCString(datum);
956 
957  /* Get slotname */
958  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
959  Anum_pg_subscription_subslotname, &isnull);
960  if (!isnull)
961  slotname = pstrdup(NameStr(*DatumGetName(datum)));
962  else
963  slotname = NULL;
964 
965  /*
966  * Since dropping a replication slot is not transactional, the replication
967  * slot stays dropped even if the transaction rolls back. So we cannot
968  * run DROP SUBSCRIPTION inside a transaction block if dropping the
969  * replication slot.
970  *
971  * XXX The command name should really be something like "DROP SUBSCRIPTION
972  * of a subscription that is associated with a replication slot", but we
973  * don't have the proper facilities for that.
974  */
975  if (slotname)
976  PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
977 
978  ObjectAddressSet(myself, SubscriptionRelationId, subid);
979  EventTriggerSQLDropAddObject(&myself, true, true);
980 
981  /* Remove the tuple from catalog. */
982  CatalogTupleDelete(rel, &tup->t_self);
983 
984  ReleaseSysCache(tup);
985 
986  /*
987  * Stop all the subscription workers immediately.
988  *
989  * This is necessary if we are dropping the replication slot, so that the
990  * slot becomes accessible.
991  *
992  * It is also necessary if the subscription is disabled and was disabled
993  * in the same transaction. Then the workers haven't seen the disabling
994  * yet and will still be running, leading to hangs later when we want to
995  * drop the replication origin. If the subscription was disabled before
996  * this transaction, then there shouldn't be any workers left, so this
997  * won't make a difference.
998  *
999  * New workers won't be started because we hold an exclusive lock on the
1000  * subscription till the end of the transaction.
1001  */
1002  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1003  subworkers = logicalrep_workers_find(subid, false);
1004  LWLockRelease(LogicalRepWorkerLock);
1005  foreach(lc, subworkers)
1006  {
1008 
1010  }
1011  list_free(subworkers);
1012 
1013  /* Clean up dependencies */
1014  deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1015 
1016  /* Remove any associated relation synchronization states. */
1018 
1019  /* Remove the origin tracking if exists. */
1020  snprintf(originname, sizeof(originname), "pg_%u", subid);
1021  originid = replorigin_by_name(originname, true);
1022  if (originid != InvalidRepOriginId)
1023  replorigin_drop(originid, false);
1024 
1025  /*
1026  * If there is no slot associated with the subscription, we can finish
1027  * here.
1028  */
1029  if (!slotname)
1030  {
1031  table_close(rel, NoLock);
1032  return;
1033  }
1034 
1035  /*
1036  * Otherwise drop the replication slot at the publisher node using the
1037  * replication connection.
1038  */
1039  load_file("libpqwalreceiver", false);
1040 
1041  initStringInfo(&cmd);
1042  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1043 
1044  wrconn = walrcv_connect(conninfo, true, subname, &err);
1045  if (wrconn == NULL)
1046  ereport(ERROR,
1047  (errmsg("could not connect to publisher when attempting to "
1048  "drop the replication slot \"%s\"", slotname),
1049  errdetail("The error was: %s", err),
1050  /* translator: %s is an SQL ALTER command */
1051  errhint("Use %s to disassociate the subscription from the slot.",
1052  "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
1053 
1054  PG_TRY();
1055  {
1056  WalRcvExecResult *res;
1057 
1058  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1059 
1060  if (res->status != WALRCV_OK_COMMAND)
1061  ereport(ERROR,
1062  (errmsg("could not drop the replication slot \"%s\" on publisher",
1063  slotname),
1064  errdetail("The error was: %s", res->err)));
1065  else
1066  ereport(NOTICE,
1067  (errmsg("dropped replication slot \"%s\" on publisher",
1068  slotname)));
1069 
1070  walrcv_clear_result(res);
1071  }
1072  PG_FINALLY();
1073  {
1074  walrcv_disconnect(wrconn);
1075  }
1076  PG_END_TRY();
1077 
1078  pfree(cmd.data);
1079 
1080  table_close(rel, NoLock);
1081 }
WalReceiverConn * wrconn
Definition: worker.c:105
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:259
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10727
Oid GetUserId(void)
Definition: miscinit.c:450
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:332
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:160
uint16 RepOriginId
Definition: xlogdefs.h:58
int errcode(int sqlerrcode)
Definition: elog.c:610
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
FormData_pg_subscription * Form_pg_subscription
unsigned int Oid
Definition: postgres_ext.h:31
NameData subname
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:585
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:429
void pfree(void *pointer)
Definition: mcxt.c:1056
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define NoLock
Definition: lockdefs.h:34
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:452
int errdetail(const char *fmt,...)
Definition: elog.c:957
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:947
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define PG_FINALLY()
Definition: elog.h:312
#define TextDatumGetCString(d)
Definition: builtins.h:87
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
#define NOTICE
Definition: elog.h:37
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:211
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
#define walrcv_disconnect(conn)
Definition: walreceiver.h:425
#define InvalidRepOriginId
Definition: origin.h:33
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
void list_free(List *list)
Definition: list.c:1376
#define NameStr(name)
Definition: c.h:622
#define PG_TRY()
Definition: elog.h:295
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
#define PG_END_TRY()
Definition: elog.h:320
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:423
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5251
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:397

◆ fetch_table_list()

static List * fetch_table_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 1186 of file subscriptioncmds.c.

References appendStringInfoChar(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errmsg(), ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), initStringInfo(), lappend(), lfirst, list_length(), makeRangeVar(), MakeSingleTupleTableSlot(), NIL, pfree(), pstrdup(), quote_literal_cstr(), relname, slot_getattr(), WalRcvExecResult::status, strVal, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, and WALRCV_OK_TUPLES.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

1187 {
1188  WalRcvExecResult *res;
1189  StringInfoData cmd;
1190  TupleTableSlot *slot;
1191  Oid tableRow[2] = {TEXTOID, TEXTOID};
1192  ListCell *lc;
1193  bool first;
1194  List *tablelist = NIL;
1195 
1196  Assert(list_length(publications) > 0);
1197 
1198  initStringInfo(&cmd);
1199  appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1200  " FROM pg_catalog.pg_publication_tables t\n"
1201  " WHERE t.pubname IN (");
1202  first = true;
1203  foreach(lc, publications)
1204  {
1205  char *pubname = strVal(lfirst(lc));
1206 
1207  if (first)
1208  first = false;
1209  else
1210  appendStringInfoString(&cmd, ", ");
1211 
1213  }
1214  appendStringInfoChar(&cmd, ')');
1215 
1216  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1217  pfree(cmd.data);
1218 
1219  if (res->status != WALRCV_OK_TUPLES)
1220  ereport(ERROR,
1221  (errmsg("could not receive list of replicated tables from the publisher: %s",
1222  res->err)));
1223 
1224  /* Process tables. */
1226  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1227  {
1228  char *nspname;
1229  char *relname;
1230  bool isnull;
1231  RangeVar *rv;
1232 
1233  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1234  Assert(!isnull);
1235  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1236  Assert(!isnull);
1237 
1238  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1239  tablelist = lappend(tablelist, rv);
1240 
1241  ExecClearTuple(slot);
1242  }
1244 
1245  walrcv_clear_result(res);
1246 
1247  return tablelist;
1248 }
#define NIL
Definition: pg_list.h:65
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1208
char * pstrdup(const char *in)
Definition: mcxt.c:1186
#define strVal(v)
Definition: value.h:54
NameData relname
Definition: pg_class.h:38
unsigned int Oid
Definition: postgres_ext.h:31
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:429
void pfree(void *pointer)
Definition: mcxt.c:1056
TupleDesc tupledesc
Definition: walreceiver.h:214
#define ERROR
Definition: elog.h:43
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1224
List * lappend(List *list, void *datum)
Definition: list.c:321
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define TextDatumGetCString(d)
Definition: builtins.h:87
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:381
#define ereport(elevel,...)
Definition: elog.h:144
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
Tuplestorestate * tuplestore
Definition: walreceiver.h:213
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:211
static int list_length(const List *l)
Definition: pg_list.h:169
int errmsg(const char *fmt,...)
Definition: elog.c:824
Definition: pg_list.h:50
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:422
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:423

◆ parse_subscription_options()

static void parse_subscription_options ( List options,
bool connect,
bool enabled_given,
bool enabled,
bool create_slot,
bool slot_name_given,
char **  slot_name,
bool copy_data,
char **  synchronous_commit,
bool refresh,
bool binary_given,
bool binary 
)
static

Definition at line 58 of file subscriptioncmds.c.

References Assert, connect, defGetBoolean(), defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, GUC_ACTION_SET, lfirst, PGC_BACKEND, PGC_S_TEST, and set_config_option().

Referenced by AlterSubscription(), and CreateSubscription().

67 {
68  ListCell *lc;
69  bool connect_given = false;
70  bool create_slot_given = false;
71  bool copy_data_given = false;
72  bool refresh_given = false;
73 
74  /* If connect is specified, the others also need to be. */
75  Assert(!connect || (enabled && create_slot && copy_data));
76 
77  if (connect)
78  *connect = true;
79  if (enabled)
80  {
81  *enabled_given = false;
82  *enabled = true;
83  }
84  if (create_slot)
85  *create_slot = true;
86  if (slot_name)
87  {
88  *slot_name_given = false;
89  *slot_name = NULL;
90  }
91  if (copy_data)
92  *copy_data = true;
94  *synchronous_commit = NULL;
95  if (refresh)
96  *refresh = true;
97  if (binary)
98  {
99  *binary_given = false;
100  *binary = false;
101  }
102 
103  /* Parse options */
104  foreach(lc, options)
105  {
106  DefElem *defel = (DefElem *) lfirst(lc);
107 
108  if (strcmp(defel->defname, "connect") == 0 && connect)
109  {
110  if (connect_given)
111  ereport(ERROR,
112  (errcode(ERRCODE_SYNTAX_ERROR),
113  errmsg("conflicting or redundant options")));
114 
115  connect_given = true;
116  *connect = defGetBoolean(defel);
117  }
118  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
119  {
120  if (*enabled_given)
121  ereport(ERROR,
122  (errcode(ERRCODE_SYNTAX_ERROR),
123  errmsg("conflicting or redundant options")));
124 
125  *enabled_given = true;
126  *enabled = defGetBoolean(defel);
127  }
128  else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
129  {
130  if (create_slot_given)
131  ereport(ERROR,
132  (errcode(ERRCODE_SYNTAX_ERROR),
133  errmsg("conflicting or redundant options")));
134 
135  create_slot_given = true;
136  *create_slot = defGetBoolean(defel);
137  }
138  else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
139  {
140  if (*slot_name_given)
141  ereport(ERROR,
142  (errcode(ERRCODE_SYNTAX_ERROR),
143  errmsg("conflicting or redundant options")));
144 
145  *slot_name_given = true;
146  *slot_name = defGetString(defel);
147 
148  /* Setting slot_name = NONE is treated as no slot name. */
149  if (strcmp(*slot_name, "none") == 0)
150  *slot_name = NULL;
151  }
152  else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
153  {
154  if (copy_data_given)
155  ereport(ERROR,
156  (errcode(ERRCODE_SYNTAX_ERROR),
157  errmsg("conflicting or redundant options")));
158 
159  copy_data_given = true;
160  *copy_data = defGetBoolean(defel);
161  }
162  else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
164  {
165  if (*synchronous_commit)
166  ereport(ERROR,
167  (errcode(ERRCODE_SYNTAX_ERROR),
168  errmsg("conflicting or redundant options")));
169 
171 
172  /* Test if the given value is valid for synchronous_commit GUC. */
173  (void) set_config_option("synchronous_commit", *synchronous_commit,
175  false, 0, false);
176  }
177  else if (strcmp(defel->defname, "refresh") == 0 && refresh)
178  {
179  if (refresh_given)
180  ereport(ERROR,
181  (errcode(ERRCODE_SYNTAX_ERROR),
182  errmsg("conflicting or redundant options")));
183 
184  refresh_given = true;
185  *refresh = defGetBoolean(defel);
186  }
187  else if (strcmp(defel->defname, "binary") == 0 && binary)
188  {
189  if (*binary_given)
190  ereport(ERROR,
191  (errcode(ERRCODE_SYNTAX_ERROR),
192  errmsg("conflicting or redundant options")));
193 
194  *binary_given = true;
195  *binary = defGetBoolean(defel);
196  }
197  else
198  ereport(ERROR,
199  (errcode(ERRCODE_SYNTAX_ERROR),
200  errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
201  }
202 
203  /*
204  * We've been explicitly asked to not connect, that requires some
205  * additional processing.
206  */
207  if (connect && !*connect)
208  {
209  /* Check for incompatible options from the user. */
210  if (enabled && *enabled_given && *enabled)
211  ereport(ERROR,
212  (errcode(ERRCODE_SYNTAX_ERROR),
213  /*- translator: both %s are strings of the form "option = value" */
214  errmsg("%s and %s are mutually exclusive options",
215  "connect = false", "enabled = true")));
216 
217  if (create_slot && create_slot_given && *create_slot)
218  ereport(ERROR,
219  (errcode(ERRCODE_SYNTAX_ERROR),
220  errmsg("%s and %s are mutually exclusive options",
221  "connect = false", "create_slot = true")));
222 
223  if (copy_data && copy_data_given && *copy_data)
224  ereport(ERROR,
225  (errcode(ERRCODE_SYNTAX_ERROR),
226  errmsg("%s and %s are mutually exclusive options",
227  "connect = false", "copy_data = true")));
228 
229  /* Change the defaults of other options. */
230  *enabled = false;
231  *create_slot = false;
232  *copy_data = false;
233  }
234 
235  /*
236  * Do additional checking for disallowed combination when slot_name = NONE
237  * was used.
238  */
239  if (slot_name && *slot_name_given && !*slot_name)
240  {
241  if (enabled && *enabled_given && *enabled)
242  ereport(ERROR,
243  (errcode(ERRCODE_SYNTAX_ERROR),
244  /*- translator: both %s are strings of the form "option = value" */
245  errmsg("%s and %s are mutually exclusive options",
246  "slot_name = NONE", "enabled = true")));
247 
248  if (create_slot && create_slot_given && *create_slot)
249  ereport(ERROR,
250  (errcode(ERRCODE_SYNTAX_ERROR),
251  errmsg("%s and %s are mutually exclusive options",
252  "slot_name = NONE", "create_slot = true")));
253 
254  if (enabled && !*enabled_given && *enabled)
255  ereport(ERROR,
256  (errcode(ERRCODE_SYNTAX_ERROR),
257  /*- translator: both %s are strings of the form "option = value" */
258  errmsg("subscription with %s must also set %s",
259  "slot_name = NONE", "enabled = false")));
260 
261  if (create_slot && !create_slot_given && *create_slot)
262  ereport(ERROR,
263  (errcode(ERRCODE_SYNTAX_ERROR),
264  errmsg("subscription with %s must also set %s",
265  "slot_name = NONE", "create_slot = false")));
266  }
267 }
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:610
#define connect(s, name, namelen)
Definition: win32_port.h:435
bool defGetBoolean(DefElem *def)
Definition: define.c:111
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
int synchronous_commit
Definition: xact.c:83
#define ereport(elevel,...)
Definition: elog.h:144
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
int errmsg(const char *fmt,...)
Definition: elog.c:824
char * defname
Definition: parsenodes.h:732
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:6949

◆ publicationListToArray()

static Datum publicationListToArray ( List publist)
static

Definition at line 273 of file subscriptioncmds.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, construct_array(), CStringGetTextDatum, CurrentMemoryContext, ereport, errcode(), errmsg(), ERROR, lfirst, list_length(), MemoryContextDelete(), MemoryContextSwitchTo(), name, palloc(), PointerGetDatum, and strVal.

Referenced by AlterSubscription(), and CreateSubscription().

274 {
275  ArrayType *arr;
276  Datum *datums;
277  int j = 0;
278  ListCell *cell;
279  MemoryContext memcxt;
280  MemoryContext oldcxt;
281 
282  /* Create memory context for temporary allocations. */
284  "publicationListToArray to array",
286  oldcxt = MemoryContextSwitchTo(memcxt);
287 
288  datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
289 
290  foreach(cell, publist)
291  {
292  char *name = strVal(lfirst(cell));
293  ListCell *pcell;
294 
295  /* Check for duplicates. */
296  foreach(pcell, publist)
297  {
298  char *pname = strVal(lfirst(pcell));
299 
300  if (pcell == cell)
301  break;
302 
303  if (strcmp(name, pname) == 0)
304  ereport(ERROR,
305  (errcode(ERRCODE_SYNTAX_ERROR),
306  errmsg("publication name \"%s\" used more than once",
307  pname)));
308  }
309 
310  datums[j++] = CStringGetTextDatum(name);
311  }
312 
313  MemoryContextSwitchTo(oldcxt);
314 
315  arr = construct_array(datums, list_length(publist),
316  TEXTOID, -1, false, TYPALIGN_INT);
317 
318  MemoryContextDelete(memcxt);
319 
320  return PointerGetDatum(arr);
321 }
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
#define AllocSetContextCreate
Definition: memutils.h:170
#define PointerGetDatum(X)
Definition: postgres.h:556
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3313
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:610
#define ERROR
Definition: elog.h:43
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
uintptr_t Datum
Definition: postgres.h:367
#define ereport(elevel,...)
Definition: elog.h:144
#define lfirst(lc)
Definition: pg_list.h:190
static int list_length(const List *l)
Definition: pg_list.h:169
const char * name
Definition: encode.c:561
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define CStringGetTextDatum(s)
Definition: builtins.h:86