PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 612 of file subscriptioncmds.c.

References AccessExclusiveLock, ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), Anum_pg_subscription_subconninfo, Anum_pg_subscription_subenabled, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, Anum_pg_subscription_subsynccommit, ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GetSubscription(), GetUserId(), heap_close, heap_freetuple(), heap_modify_tuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), MyDatabaseId, namein(), Natts_pg_subscription, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionRelationId, synchronous_commit, HeapTupleData::t_self, values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

613 {
614  Relation rel;
615  ObjectAddress myself;
616  bool nulls[Natts_pg_subscription];
617  bool replaces[Natts_pg_subscription];
619  HeapTuple tup;
620  Oid subid;
621  bool update_tuple = false;
622  Subscription *sub;
623 
625 
626  /* Fetch the existing tuple. */
628  CStringGetDatum(stmt->subname));
629 
630  if (!HeapTupleIsValid(tup))
631  ereport(ERROR,
632  (errcode(ERRCODE_UNDEFINED_OBJECT),
633  errmsg("subscription \"%s\" does not exist",
634  stmt->subname)));
635 
636  /* must be owner */
639  stmt->subname);
640 
641  subid = HeapTupleGetOid(tup);
642  sub = GetSubscription(subid, false);
643 
644  /* Lock the subscription so nobody else can do anything with it. */
646 
647  /* Form a new tuple. */
648  memset(values, 0, sizeof(values));
649  memset(nulls, false, sizeof(nulls));
650  memset(replaces, false, sizeof(replaces));
651 
652  switch (stmt->kind)
653  {
655  {
656  char *slotname;
657  bool slotname_given;
658  char *synchronous_commit;
659 
660  parse_subscription_options(stmt->options, NULL, NULL, NULL,
661  NULL, &slotname_given, &slotname,
662  NULL, &synchronous_commit, NULL);
663 
664  if (slotname_given)
665  {
666  if (sub->enabled && !slotname)
667  ereport(ERROR,
668  (errcode(ERRCODE_SYNTAX_ERROR),
669  errmsg("cannot set slot_name = NONE for enabled subscription")));
670 
671  if (slotname)
674  else
675  nulls[Anum_pg_subscription_subslotname - 1] = true;
676  replaces[Anum_pg_subscription_subslotname - 1] = true;
677  }
678 
679  if (synchronous_commit)
680  {
682  CStringGetTextDatum(synchronous_commit);
683  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
684  }
685 
686  update_tuple = true;
687  break;
688  }
689 
691  {
692  bool enabled,
693  enabled_given;
694 
696  &enabled_given, &enabled, NULL,
697  NULL, NULL, NULL, NULL, NULL);
698  Assert(enabled_given);
699 
700  if (!sub->slotname && enabled)
701  ereport(ERROR,
702  (errcode(ERRCODE_SYNTAX_ERROR),
703  errmsg("cannot enable subscription that does not have a slot name")));
704 
705  values[Anum_pg_subscription_subenabled - 1] =
706  BoolGetDatum(enabled);
707  replaces[Anum_pg_subscription_subenabled - 1] = true;
708 
709  if (enabled)
711 
712  update_tuple = true;
713  break;
714  }
715 
717  /* Load the library providing us libpq calls. */
718  load_file("libpqwalreceiver", false);
719  /* Check the connection info string. */
721 
724  replaces[Anum_pg_subscription_subconninfo - 1] = true;
725  update_tuple = true;
726  break;
727 
729  {
730  bool copy_data;
731  bool refresh;
732 
733  parse_subscription_options(stmt->options, NULL, NULL, NULL,
734  NULL, NULL, NULL, &copy_data,
735  NULL, &refresh);
736 
739  replaces[Anum_pg_subscription_subpublications - 1] = true;
740 
741  update_tuple = true;
742 
743  /* Refresh if user asked us to. */
744  if (refresh)
745  {
746  if (!sub->enabled)
747  ereport(ERROR,
748  (errcode(ERRCODE_SYNTAX_ERROR),
749  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
750  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
751 
752  /* Make sure refresh sees the new list of publications. */
753  sub->publications = stmt->publication;
754 
755  AlterSubscription_refresh(sub, copy_data);
756  }
757 
758  break;
759  }
760 
762  {
763  bool copy_data;
764 
765  if (!sub->enabled)
766  ereport(ERROR,
767  (errcode(ERRCODE_SYNTAX_ERROR),
768  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
769 
770  parse_subscription_options(stmt->options, NULL, NULL, NULL,
771  NULL, NULL, NULL, &copy_data,
772  NULL, NULL);
773 
774  AlterSubscription_refresh(sub, copy_data);
775 
776  break;
777  }
778 
779  default:
780  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
781  stmt->kind);
782  }
783 
784  /* Update the catalog if needed. */
785  if (update_tuple)
786  {
787  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
788  replaces);
789 
790  CatalogTupleUpdate(rel, &tup->t_self, tup);
791 
792  heap_freetuple(tup);
793  }
794 
796 
798 
800 
801  return myself;
802 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
int errhint(const char *fmt,...)
Definition: elog.c:987
#define Anum_pg_subscription_subpublications
#define RelationGetDescr(relation)
Definition: rel.h:428
Oid GetUserId(void)
Definition: miscinit.c:284
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:244
int errcode(int sqlerrcode)
Definition: elog.c:575
static Datum publicationListToArray(List *publist)
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:585
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define ERROR
Definition: elog.h:43
#define Natts_pg_subscription
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
AlterSubscriptionType kind
Definition: parsenodes.h:3420
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
List * publications
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
#define Anum_pg_subscription_subslotname
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define BoolGetDatum(X)
Definition: postgres.h:408
#define Anum_pg_subscription_subconninfo
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define Assert(condition)
Definition: c.h:664
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:164
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:91
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:863
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:170
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:794
#define Anum_pg_subscription_subsynccommit
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1052 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, and SubscriptionRelationId.

Referenced by ExecAlterOwnerStmt().

1053 {
1054  Oid subid;
1055  HeapTuple tup;
1056  Relation rel;
1057  ObjectAddress address;
1058 
1060 
1063 
1064  if (!HeapTupleIsValid(tup))
1065  ereport(ERROR,
1066  (errcode(ERRCODE_UNDEFINED_OBJECT),
1067  errmsg("subscription \"%s\" does not exist", name)));
1068 
1069  subid = HeapTupleGetOid(tup);
1070 
1071  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1072 
1073  ObjectAddressSet(address, SubscriptionRelationId, subid);
1074 
1075  heap_freetuple(tup);
1076 
1078 
1079  return address;
1080 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
#define ereport(elevel, rest)
Definition: elog.h:122
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:170
void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1086 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, and SubscriptionRelationId.

Referenced by shdepReassignOwned().

1087 {
1088  HeapTuple tup;
1089  Relation rel;
1090 
1092 
1094 
1095  if (!HeapTupleIsValid(tup))
1096  ereport(ERROR,
1097  (errcode(ERRCODE_UNDEFINED_OBJECT),
1098  errmsg("subscription with OID %u does not exist", subid)));
1099 
1100  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1101 
1102  heap_freetuple(tup);
1103 
1105 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:122
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:168
int errmsg(const char *fmt,...)
Definition: elog.c:797
ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 304 of file subscriptioncmds.c.

References AccessShareLock, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subdbid, Anum_pg_subscription_subenabled, Anum_pg_subscription_subname, Anum_pg_subscription_subowner, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, Anum_pg_subscription_subsynccommit, ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetSysCacheOid2, GetUserId(), heap_close, heap_form_tuple(), heap_freetuple(), heap_open(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), Natts_pg_subscription, NOTICE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, PreventTransactionChain(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, SetSubscriptionRelState(), snprintf(), CreateSubscriptionStmt::subname, SUBREL_STATE_INIT, SUBREL_STATE_READY, SUBSCRIPTIONNAME, SubscriptionRelationId, superuser(), synchronous_commit, values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

305 {
306  Relation rel;
307  ObjectAddress myself;
308  Oid subid;
309  bool nulls[Natts_pg_subscription];
311  Oid owner = GetUserId();
312  HeapTuple tup;
313  bool connect;
314  bool enabled_given;
315  bool enabled;
316  bool copy_data;
317  char *synchronous_commit;
318  char *conninfo;
319  char *slotname;
320  bool slotname_given;
321  char originname[NAMEDATALEN];
322  bool create_slot;
323  List *publications;
324 
325  /*
326  * Parse and check options.
327  *
328  * Connection and publication should not be specified here.
329  */
330  parse_subscription_options(stmt->options, &connect, &enabled_given,
331  &enabled, &create_slot, &slotname_given,
332  &slotname, &copy_data, &synchronous_commit,
333  NULL);
334 
335  /*
336  * Since creating a replication slot is not transactional, rolling back
337  * the transaction leaves the created replication slot. So we cannot run
338  * CREATE SUBSCRIPTION inside a transaction block if creating a
339  * replication slot.
340  */
341  if (create_slot)
342  PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
343 
344  if (!superuser())
345  ereport(ERROR,
346  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
347  (errmsg("must be superuser to create subscriptions"))));
348 
350 
351  /* Check if name is used */
353  CStringGetDatum(stmt->subname));
354  if (OidIsValid(subid))
355  {
356  ereport(ERROR,
358  errmsg("subscription \"%s\" already exists",
359  stmt->subname)));
360  }
361 
362  if (!slotname_given && slotname == NULL)
363  slotname = stmt->subname;
364 
365  /* The default for synchronous_commit of subscriptions is off. */
366  if (synchronous_commit == NULL)
367  synchronous_commit = "off";
368 
369  conninfo = stmt->conninfo;
370  publications = stmt->publication;
371 
372  /* Load the library providing us libpq calls. */
373  load_file("libpqwalreceiver", false);
374 
375  /* Check the connection info string. */
376  walrcv_check_conninfo(conninfo);
377 
378  /* Everything ok, form a new tuple. */
379  memset(values, 0, sizeof(values));
380  memset(nulls, false, sizeof(nulls));
381 
383  values[Anum_pg_subscription_subname - 1] =
385  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
386  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
388  CStringGetTextDatum(conninfo);
389  if (slotname)
392  else
393  nulls[Anum_pg_subscription_subslotname - 1] = true;
395  CStringGetTextDatum(synchronous_commit);
397  publicationListToArray(publications);
398 
399  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
400 
401  /* Insert tuple into catalog. */
402  subid = CatalogTupleInsert(rel, tup);
403  heap_freetuple(tup);
404 
406 
407  snprintf(originname, sizeof(originname), "pg_%u", subid);
408  replorigin_create(originname);
409 
410  /*
411  * Connect to remote side to execute requested commands and fetch table
412  * info.
413  */
414  if (connect)
415  {
416  XLogRecPtr lsn;
417  char *err;
419  List *tables;
420  ListCell *lc;
421  char table_state;
422 
423  /* Try to connect to the publisher. */
424  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
425  if (!wrconn)
426  ereport(ERROR,
427  (errmsg("could not connect to the publisher: %s", err)));
428 
429  PG_TRY();
430  {
431  /*
432  * Set sync state based on if we were asked to do data copy or
433  * not.
434  */
435  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
436 
437  /*
438  * Get the table list from publisher and build local table status
439  * info.
440  */
441  tables = fetch_table_list(wrconn, publications);
442  foreach(lc, tables)
443  {
444  RangeVar *rv = (RangeVar *) lfirst(lc);
445  Oid relid;
446 
447  relid = RangeVarGetRelid(rv, AccessShareLock, false);
448 
449  /* Check for supported relkind. */
451  rv->schemaname, rv->relname);
452 
453  SetSubscriptionRelState(subid, relid, table_state,
454  InvalidXLogRecPtr, false);
455  }
456 
457  /*
458  * If requested, create permanent slot for the subscription. We
459  * won't use the initial snapshot for anything, so no need to
460  * export it.
461  */
462  if (create_slot)
463  {
464  Assert(slotname);
465 
466  walrcv_create_slot(wrconn, slotname, false,
467  CRS_NOEXPORT_SNAPSHOT, &lsn);
468  ereport(NOTICE,
469  (errmsg("created replication slot \"%s\" on publisher",
470  slotname)));
471  }
472  }
473  PG_CATCH();
474  {
475  /* Close the connection in case of failure. */
476  walrcv_disconnect(wrconn);
477  PG_RE_THROW();
478  }
479  PG_END_TRY();
480 
481  /* And we are done with the remote side. */
482  walrcv_disconnect(wrconn);
483  }
484  else
486  (errmsg("tables were not subscribed, you will have to run "
487  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
488  "subscribe the tables")));
489 
491 
492  if (enabled)
494 
496 
498 
499  return myself;
500 }
#define connect(s, name, namelen)
Definition: win32.h:373
#define SUBREL_STATE_INIT
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
WalReceiverConn * wrconn
Definition: worker.c:110
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
#define Anum_pg_subscription_subpublications
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
#define Anum_pg_subscription_subowner
#define RelationGetDescr(relation)
Definition: rel.h:428
Oid GetUserId(void)
Definition: miscinit.c:284
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:53
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:244
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1801
#define AccessShareLock
Definition: lockdefs.h:36
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:695
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:585
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:159
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1373
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:260
#define OidIsValid(objectId)
Definition: c.h:532
#define GetSysCacheOid2(cacheId, key1, key2)
Definition: syscache.h:188
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define Natts_pg_subscription
#define Anum_pg_subscription_subname
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:584
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define Anum_pg_subscription_subslotname
#define Anum_pg_subscription_subdbid
#define WARNING
Definition: elog.h:40
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool update_only)
uintptr_t Datum
Definition: postgres.h:372
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
#define BoolGetDatum(X)
Definition: postgres.h:408
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:664
#define lfirst(lc)
Definition: pg_list.h:106
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:164
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:91
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:863
#define PG_TRY()
Definition: elog.h:284
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:33
Definition: pg_list.h:45
#define SUBREL_STATE_READY
#define Anum_pg_subscription_subsynccommit
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
#define PG_END_TRY()
Definition: elog.h:300
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3161
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 808 of file subscriptioncmds.c.

References AccessExclusiveLock, ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subname, Anum_pg_subscription_subslotname, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GetUserId(), heap_close, heap_open(), HeapTupleGetOid, HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, ObjectAddressSet, pfree(), PG_CATCH, PG_END_TRY, PG_RE_THROW, pg_subscription_ownercheck(), PG_TRY, PreventTransactionChain(), pstrdup(), quote_identifier(), ReleaseSysCache(), LogicalRepWorker::relid, RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2, snprintf(), WalRcvExecResult::status, LogicalRepWorker::subid, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SubscriptionRelationId, SysCacheGetAttr(), HeapTupleData::t_self, TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

809 {
810  Relation rel;
811  ObjectAddress myself;
812  HeapTuple tup;
813  Oid subid;
814  Datum datum;
815  bool isnull;
816  char *subname;
817  char *conninfo;
818  char *slotname;
819  List *subworkers;
820  ListCell *lc;
821  char originname[NAMEDATALEN];
822  char *err = NULL;
823  RepOriginId originid;
824  WalReceiverConn *wrconn = NULL;
825  StringInfoData cmd;
826 
827  /*
828  * Lock pg_subscription with AccessExclusiveLock to ensure that the
829  * launcher doesn't restart new worker during dropping the subscription
830  */
832 
834  CStringGetDatum(stmt->subname));
835 
836  if (!HeapTupleIsValid(tup))
837  {
838  heap_close(rel, NoLock);
839 
840  if (!stmt->missing_ok)
841  ereport(ERROR,
842  (errcode(ERRCODE_UNDEFINED_OBJECT),
843  errmsg("subscription \"%s\" does not exist",
844  stmt->subname)));
845  else
846  ereport(NOTICE,
847  (errmsg("subscription \"%s\" does not exist, skipping",
848  stmt->subname)));
849 
850  return;
851  }
852 
853  subid = HeapTupleGetOid(tup);
854 
855  /* must be owner */
856  if (!pg_subscription_ownercheck(subid, GetUserId()))
858  stmt->subname);
859 
860  /* DROP hook for the subscription being removed */
862 
863  /*
864  * Lock the subscription so nobody else can do anything with it (including
865  * the replication workers).
866  */
868 
869  /* Get subname */
870  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
872  Assert(!isnull);
873  subname = pstrdup(NameStr(*DatumGetName(datum)));
874 
875  /* Get conninfo */
876  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
878  Assert(!isnull);
879  conninfo = TextDatumGetCString(datum);
880 
881  /* Get slotname */
882  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
884  if (!isnull)
885  slotname = pstrdup(NameStr(*DatumGetName(datum)));
886  else
887  slotname = NULL;
888 
889  /*
890  * Since dropping a replication slot is not transactional, the replication
891  * slot stays dropped even if the transaction rolls back. So we cannot
892  * run DROP SUBSCRIPTION inside a transaction block if dropping the
893  * replication slot.
894  *
895  * XXX The command name should really be something like "DROP SUBSCRIPTION
896  * of a subscription that is associated with a replication slot", but we
897  * don't have the proper facilities for that.
898  */
899  if (slotname)
900  PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
901 
902 
904  EventTriggerSQLDropAddObject(&myself, true, true);
905 
906  /* Remove the tuple from catalog. */
907  CatalogTupleDelete(rel, &tup->t_self);
908 
909  ReleaseSysCache(tup);
910 
911  /*
912  * Stop all the subscription workers immediately.
913  *
914  * This is necessary if we are dropping the replication slot, so that the
915  * slot becomes accessible.
916  *
917  * It is also necessary if the subscription is disabled and was disabled
918  * in the same transaction. Then the workers haven't seen the disabling
919  * yet and will still be running, leading to hangs later when we want to
920  * drop the replication origin. If the subscription was disabled before
921  * this transaction, then there shouldn't be any workers left, so this
922  * won't make a difference.
923  *
924  * New workers won't be started because we hold an exclusive lock on the
925  * subscription till the end of the transaction.
926  */
927  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
928  subworkers = logicalrep_workers_find(subid, false);
929  LWLockRelease(LogicalRepWorkerLock);
930  foreach(lc, subworkers)
931  {
933 
935  }
936  list_free(subworkers);
937 
938  /* Clean up dependencies */
940 
941  /* Remove any associated relation synchronization states. */
943 
944  /* Remove the origin tracking if exists. */
945  snprintf(originname, sizeof(originname), "pg_%u", subid);
946  originid = replorigin_by_name(originname, true);
947  if (originid != InvalidRepOriginId)
948  replorigin_drop(originid, false);
949 
950  /*
951  * If there is no slot associated with the subscription, we can finish
952  * here.
953  */
954  if (!slotname)
955  {
956  heap_close(rel, NoLock);
957  return;
958  }
959 
960  /*
961  * Otherwise drop the replication slot at the publisher node using the
962  * replication connection.
963  */
964  load_file("libpqwalreceiver", false);
965 
966  initStringInfo(&cmd);
967  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
968 
969  wrconn = walrcv_connect(conninfo, true, subname, &err);
970  if (wrconn == NULL)
971  ereport(ERROR,
972  (errmsg("could not connect to publisher when attempting to "
973  "drop the replication slot \"%s\"", slotname),
974  errdetail("The error was: %s", err),
975  errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
976  "to disassociate the subscription from the slot.")));
977 
978  PG_TRY();
979  {
980  WalRcvExecResult *res;
981 
982  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
983 
984  if (res->status != WALRCV_OK_COMMAND)
985  ereport(ERROR,
986  (errmsg("could not drop the replication slot \"%s\" on publisher",
987  slotname),
988  errdetail("The error was: %s", res->err)));
989  else
990  ereport(NOTICE,
991  (errmsg("dropped replication slot \"%s\" on publisher",
992  slotname)));
993 
994  walrcv_clear_result(res);
995  }
996  PG_CATCH();
997  {
998  /* Close the connection in case of failure */
999  walrcv_disconnect(wrconn);
1000  PG_RE_THROW();
1001  }
1002  PG_END_TRY();
1003 
1004  walrcv_disconnect(wrconn);
1005 
1006  pfree(cmd.data);
1007 
1008  heap_close(rel, NoLock);
1009 }
WalReceiverConn * wrconn
Definition: worker.c:110
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:262
int errhint(const char *fmt,...)
Definition: elog.c:987
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10390
Oid GetUserId(void)
Definition: miscinit.c:284
char * pstrdup(const char *in)
Definition: mcxt.c:1076
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:332
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:154
uint16 RepOriginId
Definition: xlogdefs.h:51
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
#define heap_close(r, l)
Definition: heapam.h:97
unsigned int Oid
Definition: postgres_ext.h:31
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1721
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:591
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:268
void pfree(void *pointer)
Definition: mcxt.c:949
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
#define ERROR
Definition: elog.h:43
#define Anum_pg_subscription_subname
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
#define NoLock
Definition: lockdefs.h:34
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:454
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define CStringGetDatum(X)
Definition: postgres.h:584
#define ereport(elevel, rest)
Definition: elog.h:122
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:823
#define Anum_pg_subscription_subslotname
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define TextDatumGetCString(d)
Definition: builtins.h:92
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1325
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define InvalidOid
Definition: postgres_ext.h:36
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define Assert(condition)
Definition: c.h:664
#define lfirst(lc)
Definition: pg_list.h:106
WalRcvExecStatus status
Definition: walreceiver.h:187
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1117
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
#define InvalidRepOriginId
Definition: origin.h:34
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:797
void list_free(List *list)
Definition: list.c:1133
#define NameStr(name)
Definition: c.h:493
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define PG_TRY()
Definition: elog.h:284
Definition: pg_list.h:45
#define PG_END_TRY()
Definition: elog.h:300
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:262
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3161
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
#define SearchSysCache2(cacheId, key1, key2)
Definition: syscache.h:161