PostgreSQL Source Code  git master
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 622 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), MyDatabaseId, namein(), OBJECT_SUBSCRIPTION, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, synchronous_commit, HeapTupleData::t_self, table_close(), table_open(), values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

623 {
624  Relation rel;
625  ObjectAddress myself;
626  bool nulls[Natts_pg_subscription];
627  bool replaces[Natts_pg_subscription];
628  Datum values[Natts_pg_subscription];
629  HeapTuple tup;
630  Oid subid;
631  bool update_tuple = false;
632  Subscription *sub;
634 
635  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
636 
637  /* Fetch the existing tuple. */
639  CStringGetDatum(stmt->subname));
640 
641  if (!HeapTupleIsValid(tup))
642  ereport(ERROR,
643  (errcode(ERRCODE_UNDEFINED_OBJECT),
644  errmsg("subscription \"%s\" does not exist",
645  stmt->subname)));
646 
647  form = (Form_pg_subscription) GETSTRUCT(tup);
648  subid = form->oid;
649 
650  /* must be owner */
651  if (!pg_subscription_ownercheck(subid, GetUserId()))
653  stmt->subname);
654 
655  sub = GetSubscription(subid, false);
656 
657  /* Lock the subscription so nobody else can do anything with it. */
658  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
659 
660  /* Form a new tuple. */
661  memset(values, 0, sizeof(values));
662  memset(nulls, false, sizeof(nulls));
663  memset(replaces, false, sizeof(replaces));
664 
665  switch (stmt->kind)
666  {
668  {
669  char *slotname;
670  bool slotname_given;
671  char *synchronous_commit;
672 
673  parse_subscription_options(stmt->options, NULL, NULL, NULL,
674  NULL, &slotname_given, &slotname,
675  NULL, &synchronous_commit, NULL);
676 
677  if (slotname_given)
678  {
679  if (sub->enabled && !slotname)
680  ereport(ERROR,
681  (errcode(ERRCODE_SYNTAX_ERROR),
682  errmsg("cannot set %s for enabled subscription",
683  "slot_name = NONE")));
684 
685  if (slotname)
686  values[Anum_pg_subscription_subslotname - 1] =
688  else
689  nulls[Anum_pg_subscription_subslotname - 1] = true;
690  replaces[Anum_pg_subscription_subslotname - 1] = true;
691  }
692 
693  if (synchronous_commit)
694  {
695  values[Anum_pg_subscription_subsynccommit - 1] =
696  CStringGetTextDatum(synchronous_commit);
697  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
698  }
699 
700  update_tuple = true;
701  break;
702  }
703 
705  {
706  bool enabled,
707  enabled_given;
708 
710  &enabled_given, &enabled, NULL,
711  NULL, NULL, NULL, NULL, NULL);
712  Assert(enabled_given);
713 
714  if (!sub->slotname && enabled)
715  ereport(ERROR,
716  (errcode(ERRCODE_SYNTAX_ERROR),
717  errmsg("cannot enable subscription that does not have a slot name")));
718 
719  values[Anum_pg_subscription_subenabled - 1] =
720  BoolGetDatum(enabled);
721  replaces[Anum_pg_subscription_subenabled - 1] = true;
722 
723  if (enabled)
725 
726  update_tuple = true;
727  break;
728  }
729 
731  /* Load the library providing us libpq calls. */
732  load_file("libpqwalreceiver", false);
733  /* Check the connection info string. */
735 
736  values[Anum_pg_subscription_subconninfo - 1] =
738  replaces[Anum_pg_subscription_subconninfo - 1] = true;
739  update_tuple = true;
740  break;
741 
743  {
744  bool copy_data;
745  bool refresh;
746 
747  parse_subscription_options(stmt->options, NULL, NULL, NULL,
748  NULL, NULL, NULL, &copy_data,
749  NULL, &refresh);
750 
751  values[Anum_pg_subscription_subpublications - 1] =
753  replaces[Anum_pg_subscription_subpublications - 1] = true;
754 
755  update_tuple = true;
756 
757  /* Refresh if user asked us to. */
758  if (refresh)
759  {
760  if (!sub->enabled)
761  ereport(ERROR,
762  (errcode(ERRCODE_SYNTAX_ERROR),
763  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
764  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
765 
766  /* Make sure refresh sees the new list of publications. */
767  sub->publications = stmt->publication;
768 
769  AlterSubscription_refresh(sub, copy_data);
770  }
771 
772  break;
773  }
774 
776  {
777  bool copy_data;
778 
779  if (!sub->enabled)
780  ereport(ERROR,
781  (errcode(ERRCODE_SYNTAX_ERROR),
782  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
783 
784  parse_subscription_options(stmt->options, NULL, NULL, NULL,
785  NULL, NULL, NULL, &copy_data,
786  NULL, NULL);
787 
788  AlterSubscription_refresh(sub, copy_data);
789 
790  break;
791  }
792 
793  default:
794  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
795  stmt->kind);
796  }
797 
798  /* Update the catalog if needed. */
799  if (update_tuple)
800  {
801  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
802  replaces);
803 
804  CatalogTupleUpdate(rel, &tup->t_self, tup);
805 
806  heap_freetuple(tup);
807  }
808 
810 
811  ObjectAddressSet(myself, SubscriptionRelationId, subid);
812 
813  InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
814 
815  return myself;
816 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:482
Oid GetUserId(void)
Definition: miscinit.c:448
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:274
int errcode(int sqlerrcode)
Definition: elog.c:610
static Datum publicationListToArray(List *publist)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3327
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
AlterSubscriptionType kind
Definition: parsenodes.h:3558
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
List * publications
int synchronous_commit
Definition: xact.c:83
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:738
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:224
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:87
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:929
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5284

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1064 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

1065 {
1066  Oid subid;
1067  HeapTuple tup;
1068  Relation rel;
1069  ObjectAddress address;
1070  Form_pg_subscription form;
1071 
1072  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1073 
1076 
1077  if (!HeapTupleIsValid(tup))
1078  ereport(ERROR,
1079  (errcode(ERRCODE_UNDEFINED_OBJECT),
1080  errmsg("subscription \"%s\" does not exist", name)));
1081 
1082  form = (Form_pg_subscription) GETSTRUCT(tup);
1083  subid = form->oid;
1084 
1085  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1086 
1087  ObjectAddressSet(address, SubscriptionRelationId, subid);
1088 
1089  heap_freetuple(tup);
1090 
1092 
1093  return address;
1094 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:610
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
Oid MyDatabaseId
Definition: globals.c:85
#define ereport(elevel,...)
Definition: elog.h:144
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:555
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1100 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, table_close(), and table_open().

Referenced by shdepReassignOwned().

1101 {
1102  HeapTuple tup;
1103  Relation rel;
1104 
1105  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1106 
1108 
1109  if (!HeapTupleIsValid(tup))
1110  ereport(ERROR,
1111  (errcode(ERRCODE_UNDEFINED_OBJECT),
1112  errmsg("subscription with OID %u does not exist", subid)));
1113 
1114  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1115 
1116  heap_freetuple(tup);
1117 
1119 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel,...)
Definition: elog.h:144
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ CreateSubscription()

ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 308 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, create_slot, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), heap_form_tuple(), heap_freetuple(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, PreventInTransactionBlock(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, snprintf, CreateSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionObjectIndexId, superuser(), synchronous_commit, table_close(), table_open(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

309 {
310  Relation rel;
311  ObjectAddress myself;
312  Oid subid;
313  bool nulls[Natts_pg_subscription];
314  Datum values[Natts_pg_subscription];
315  Oid owner = GetUserId();
316  HeapTuple tup;
317  bool connect;
318  bool enabled_given;
319  bool enabled;
320  bool copy_data;
321  char *synchronous_commit;
322  char *conninfo;
323  char *slotname;
324  bool slotname_given;
325  char originname[NAMEDATALEN];
326  bool create_slot;
327  List *publications;
328 
329  /*
330  * Parse and check options.
331  *
332  * Connection and publication should not be specified here.
333  */
334  parse_subscription_options(stmt->options, &connect, &enabled_given,
335  &enabled, &create_slot, &slotname_given,
336  &slotname, &copy_data, &synchronous_commit,
337  NULL);
338 
339  /*
340  * Since creating a replication slot is not transactional, rolling back
341  * the transaction leaves the created replication slot. So we cannot run
342  * CREATE SUBSCRIPTION inside a transaction block if creating a
343  * replication slot.
344  */
345  if (create_slot)
346  PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
347 
348  if (!superuser())
349  ereport(ERROR,
350  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
351  errmsg("must be superuser to create subscriptions")));
352 
353  /*
354  * If built with appropriate switch, whine when regression-testing
355  * conventions for subscription names are violated.
356  */
357 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
358  if (strncmp(stmt->subname, "regress_", 8) != 0)
359  elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
360 #endif
361 
362  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
363 
364  /* Check if name is used */
365  subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
367  if (OidIsValid(subid))
368  {
369  ereport(ERROR,
371  errmsg("subscription \"%s\" already exists",
372  stmt->subname)));
373  }
374 
375  if (!slotname_given && slotname == NULL)
376  slotname = stmt->subname;
377 
378  /* The default for synchronous_commit of subscriptions is off. */
379  if (synchronous_commit == NULL)
380  synchronous_commit = "off";
381 
382  conninfo = stmt->conninfo;
383  publications = stmt->publication;
384 
385  /* Load the library providing us libpq calls. */
386  load_file("libpqwalreceiver", false);
387 
388  /* Check the connection info string. */
389  walrcv_check_conninfo(conninfo);
390 
391  /* Everything ok, form a new tuple. */
392  memset(values, 0, sizeof(values));
393  memset(nulls, false, sizeof(nulls));
394 
396  Anum_pg_subscription_oid);
397  values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
398  values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
399  values[Anum_pg_subscription_subname - 1] =
401  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
402  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
403  values[Anum_pg_subscription_subconninfo - 1] =
404  CStringGetTextDatum(conninfo);
405  if (slotname)
406  values[Anum_pg_subscription_subslotname - 1] =
408  else
409  nulls[Anum_pg_subscription_subslotname - 1] = true;
410  values[Anum_pg_subscription_subsynccommit - 1] =
411  CStringGetTextDatum(synchronous_commit);
412  values[Anum_pg_subscription_subpublications - 1] =
413  publicationListToArray(publications);
414 
415  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
416 
417  /* Insert tuple into catalog. */
418  CatalogTupleInsert(rel, tup);
419  heap_freetuple(tup);
420 
421  recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
422 
423  snprintf(originname, sizeof(originname), "pg_%u", subid);
424  replorigin_create(originname);
425 
426  /*
427  * Connect to remote side to execute requested commands and fetch table
428  * info.
429  */
430  if (connect)
431  {
432  char *err;
434  List *tables;
435  ListCell *lc;
436  char table_state;
437 
438  /* Try to connect to the publisher. */
439  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
440  if (!wrconn)
441  ereport(ERROR,
442  (errmsg("could not connect to the publisher: %s", err)));
443 
444  PG_TRY();
445  {
446  /*
447  * Set sync state based on if we were asked to do data copy or
448  * not.
449  */
450  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
451 
452  /*
453  * Get the table list from publisher and build local table status
454  * info.
455  */
456  tables = fetch_table_list(wrconn, publications);
457  foreach(lc, tables)
458  {
459  RangeVar *rv = (RangeVar *) lfirst(lc);
460  Oid relid;
461 
462  relid = RangeVarGetRelid(rv, AccessShareLock, false);
463 
464  /* Check for supported relkind. */
466  rv->schemaname, rv->relname);
467 
468  AddSubscriptionRelState(subid, relid, table_state,
470  }
471 
472  /*
473  * If requested, create permanent slot for the subscription. We
474  * won't use the initial snapshot for anything, so no need to
475  * export it.
476  */
477  if (create_slot)
478  {
479  Assert(slotname);
480 
481  walrcv_create_slot(wrconn, slotname, false,
482  CRS_NOEXPORT_SNAPSHOT, NULL);
483  ereport(NOTICE,
484  (errmsg("created replication slot \"%s\" on publisher",
485  slotname)));
486  }
487  }
488  PG_FINALLY();
489  {
490  walrcv_disconnect(wrconn);
491  }
492  PG_END_TRY();
493  }
494  else
496  /* translator: %s is an SQL ALTER statement */
497  (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
498  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
499 
501 
502  if (enabled)
504 
505  ObjectAddressSet(myself, SubscriptionRelationId, subid);
506 
507  InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
508 
509  return myself;
510 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:317
WalReceiverConn * wrconn
Definition: worker.c:105
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:482
Oid GetUserId(void)
Definition: miscinit.c:448
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:78
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:274
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1866
#define AccessShareLock
Definition: lockdefs.h:36
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define connect(s, name, namelen)
Definition: win32_port.h:435
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:294
#define OidIsValid(objectId)
Definition: c.h:644
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3350
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:245
int synchronous_commit
Definition: xact.c:83
#define WARNING
Definition: elog.h:40
#define PG_FINALLY()
Definition: elog.h:312
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
#define NOTICE
Definition: elog.h:37
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define walrcv_disconnect(conn)
Definition: walreceiver.h:300
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:167
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:87
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:929
#define PG_TRY()
Definition: elog.h:295
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
#define PG_END_TRY()
Definition: elog.h:320
#define SubscriptionObjectIndexId
Definition: indexing.h:358
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:272

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 822 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GETSTRUCT, GetUserId(), HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, OBJECT_SUBSCRIPTION, ObjectAddressSet, pfree(), PG_END_TRY, PG_FINALLY, pg_subscription_ownercheck(), PG_TRY, PreventInTransactionBlock(), pstrdup(), quote_identifier(), ReleaseSysCache(), LogicalRepWorker::relid, RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2(), snprintf, WalRcvExecResult::status, LogicalRepWorker::subid, subname, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SysCacheGetAttr(), HeapTupleData::t_self, table_close(), table_open(), TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

823 {
824  Relation rel;
825  ObjectAddress myself;
826  HeapTuple tup;
827  Oid subid;
828  Datum datum;
829  bool isnull;
830  char *subname;
831  char *conninfo;
832  char *slotname;
833  List *subworkers;
834  ListCell *lc;
835  char originname[NAMEDATALEN];
836  char *err = NULL;
837  RepOriginId originid;
838  WalReceiverConn *wrconn = NULL;
839  StringInfoData cmd;
841 
842  /*
843  * Lock pg_subscription with AccessExclusiveLock to ensure that the
844  * launcher doesn't restart new worker during dropping the subscription
845  */
846  rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
847 
849  CStringGetDatum(stmt->subname));
850 
851  if (!HeapTupleIsValid(tup))
852  {
853  table_close(rel, NoLock);
854 
855  if (!stmt->missing_ok)
856  ereport(ERROR,
857  (errcode(ERRCODE_UNDEFINED_OBJECT),
858  errmsg("subscription \"%s\" does not exist",
859  stmt->subname)));
860  else
861  ereport(NOTICE,
862  (errmsg("subscription \"%s\" does not exist, skipping",
863  stmt->subname)));
864 
865  return;
866  }
867 
868  form = (Form_pg_subscription) GETSTRUCT(tup);
869  subid = form->oid;
870 
871  /* must be owner */
872  if (!pg_subscription_ownercheck(subid, GetUserId()))
874  stmt->subname);
875 
876  /* DROP hook for the subscription being removed */
877  InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
878 
879  /*
880  * Lock the subscription so nobody else can do anything with it (including
881  * the replication workers).
882  */
883  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
884 
885  /* Get subname */
886  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
887  Anum_pg_subscription_subname, &isnull);
888  Assert(!isnull);
889  subname = pstrdup(NameStr(*DatumGetName(datum)));
890 
891  /* Get conninfo */
892  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
893  Anum_pg_subscription_subconninfo, &isnull);
894  Assert(!isnull);
895  conninfo = TextDatumGetCString(datum);
896 
897  /* Get slotname */
898  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
899  Anum_pg_subscription_subslotname, &isnull);
900  if (!isnull)
901  slotname = pstrdup(NameStr(*DatumGetName(datum)));
902  else
903  slotname = NULL;
904 
905  /*
906  * Since dropping a replication slot is not transactional, the replication
907  * slot stays dropped even if the transaction rolls back. So we cannot
908  * run DROP SUBSCRIPTION inside a transaction block if dropping the
909  * replication slot.
910  *
911  * XXX The command name should really be something like "DROP SUBSCRIPTION
912  * of a subscription that is associated with a replication slot", but we
913  * don't have the proper facilities for that.
914  */
915  if (slotname)
916  PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
917 
918  ObjectAddressSet(myself, SubscriptionRelationId, subid);
919  EventTriggerSQLDropAddObject(&myself, true, true);
920 
921  /* Remove the tuple from catalog. */
922  CatalogTupleDelete(rel, &tup->t_self);
923 
924  ReleaseSysCache(tup);
925 
926  /*
927  * Stop all the subscription workers immediately.
928  *
929  * This is necessary if we are dropping the replication slot, so that the
930  * slot becomes accessible.
931  *
932  * It is also necessary if the subscription is disabled and was disabled
933  * in the same transaction. Then the workers haven't seen the disabling
934  * yet and will still be running, leading to hangs later when we want to
935  * drop the replication origin. If the subscription was disabled before
936  * this transaction, then there shouldn't be any workers left, so this
937  * won't make a difference.
938  *
939  * New workers won't be started because we hold an exclusive lock on the
940  * subscription till the end of the transaction.
941  */
942  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
943  subworkers = logicalrep_workers_find(subid, false);
944  LWLockRelease(LogicalRepWorkerLock);
945  foreach(lc, subworkers)
946  {
948 
950  }
951  list_free(subworkers);
952 
953  /* Clean up dependencies */
954  deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
955 
956  /* Remove any associated relation synchronization states. */
958 
959  /* Remove the origin tracking if exists. */
960  snprintf(originname, sizeof(originname), "pg_%u", subid);
961  originid = replorigin_by_name(originname, true);
962  if (originid != InvalidRepOriginId)
963  replorigin_drop(originid, false);
964 
965  /*
966  * If there is no slot associated with the subscription, we can finish
967  * here.
968  */
969  if (!slotname)
970  {
971  table_close(rel, NoLock);
972  return;
973  }
974 
975  /*
976  * Otherwise drop the replication slot at the publisher node using the
977  * replication connection.
978  */
979  load_file("libpqwalreceiver", false);
980 
981  initStringInfo(&cmd);
982  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
983 
984  wrconn = walrcv_connect(conninfo, true, subname, &err);
985  if (wrconn == NULL)
986  ereport(ERROR,
987  (errmsg("could not connect to publisher when attempting to "
988  "drop the replication slot \"%s\"", slotname),
989  errdetail("The error was: %s", err),
990  /* translator: %s is an SQL ALTER command */
991  errhint("Use %s to disassociate the subscription from the slot.",
992  "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
993 
994  PG_TRY();
995  {
996  WalRcvExecResult *res;
997 
998  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
999 
1000  if (res->status != WALRCV_OK_COMMAND)
1001  ereport(ERROR,
1002  (errmsg("could not drop the replication slot \"%s\" on publisher",
1003  slotname),
1004  errdetail("The error was: %s", res->err)));
1005  else
1006  ereport(NOTICE,
1007  (errmsg("dropped replication slot \"%s\" on publisher",
1008  slotname)));
1009 
1010  walrcv_clear_result(res);
1011  }
1012  PG_FINALLY();
1013  {
1014  walrcv_disconnect(wrconn);
1015  }
1016  PG_END_TRY();
1017 
1018  pfree(cmd.data);
1019 
1020  table_close(rel, NoLock);
1021 }
WalReceiverConn * wrconn
Definition: worker.c:105
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:259
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10737
Oid GetUserId(void)
Definition: miscinit.c:448
char * pstrdup(const char *in)
Definition: mcxt.c:1186
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:337
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:160
uint16 RepOriginId
Definition: xlogdefs.h:58
int errcode(int sqlerrcode)
Definition: elog.c:610
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
FormData_pg_subscription * Form_pg_subscription
unsigned int Oid
Definition: postgres_ext.h:31
NameData subname
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:214
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:585
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3327
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:304
void pfree(void *pointer)
Definition: mcxt.c:1056
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define NoLock
Definition: lockdefs.h:34
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:452
int errdetail(const char *fmt,...)
Definition: elog.c:957
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3350
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:907
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define PG_FINALLY()
Definition: elog.h:312
#define TextDatumGetCString(d)
Definition: builtins.h:88
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1002
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
#define NOTICE
Definition: elog.h:37
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:738
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:210
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
#define walrcv_disconnect(conn)
Definition: walreceiver.h:300
#define InvalidRepOriginId
Definition: origin.h:33
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
void list_free(List *list)
Definition: list.c:1376
#define NameStr(name)
Definition: c.h:615
#define PG_TRY()
Definition: elog.h:295
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
#define PG_END_TRY()
Definition: elog.h:320
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:298
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5284
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:272