PostgreSQL Source Code  git master
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 669 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH, AlterSubscription_refresh(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleUpdate(), AlterSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, InvokeObjectPostAlterHook, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), MyDatabaseId, namein(), OBJECT_SUBSCRIPTION, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, synchronous_commit, HeapTupleData::t_self, table_close(), table_open(), values, and walrcv_check_conninfo.

Referenced by ProcessUtilitySlow().

670 {
671  Relation rel;
672  ObjectAddress myself;
673  bool nulls[Natts_pg_subscription];
674  bool replaces[Natts_pg_subscription];
675  Datum values[Natts_pg_subscription];
676  HeapTuple tup;
677  Oid subid;
678  bool update_tuple = false;
679  Subscription *sub;
681 
682  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
683 
684  /* Fetch the existing tuple. */
686  CStringGetDatum(stmt->subname));
687 
688  if (!HeapTupleIsValid(tup))
689  ereport(ERROR,
690  (errcode(ERRCODE_UNDEFINED_OBJECT),
691  errmsg("subscription \"%s\" does not exist",
692  stmt->subname)));
693 
694  form = (Form_pg_subscription) GETSTRUCT(tup);
695  subid = form->oid;
696 
697  /* must be owner */
698  if (!pg_subscription_ownercheck(subid, GetUserId()))
700  stmt->subname);
701 
702  sub = GetSubscription(subid, false);
703 
704  /* Lock the subscription so nobody else can do anything with it. */
705  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
706 
707  /* Form a new tuple. */
708  memset(values, 0, sizeof(values));
709  memset(nulls, false, sizeof(nulls));
710  memset(replaces, false, sizeof(replaces));
711 
712  switch (stmt->kind)
713  {
715  {
716  char *slotname;
717  bool slotname_given;
718  char *synchronous_commit;
719  bool binary_given;
720  bool binary;
721  bool streaming_given;
722  bool streaming;
723 
725  NULL, /* no "connect" */
726  NULL, NULL, /* no "enabled" */
727  NULL, /* no "create_slot" */
728  &slotname_given, &slotname,
729  NULL, /* no "copy_data" */
730  &synchronous_commit,
731  NULL, /* no "refresh" */
732  &binary_given, &binary,
733  &streaming_given, &streaming);
734 
735  if (slotname_given)
736  {
737  if (sub->enabled && !slotname)
738  ereport(ERROR,
739  (errcode(ERRCODE_SYNTAX_ERROR),
740  errmsg("cannot set %s for enabled subscription",
741  "slot_name = NONE")));
742 
743  if (slotname)
744  values[Anum_pg_subscription_subslotname - 1] =
746  else
747  nulls[Anum_pg_subscription_subslotname - 1] = true;
748  replaces[Anum_pg_subscription_subslotname - 1] = true;
749  }
750 
751  if (synchronous_commit)
752  {
753  values[Anum_pg_subscription_subsynccommit - 1] =
754  CStringGetTextDatum(synchronous_commit);
755  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
756  }
757 
758  if (binary_given)
759  {
760  values[Anum_pg_subscription_subbinary - 1] =
761  BoolGetDatum(binary);
762  replaces[Anum_pg_subscription_subbinary - 1] = true;
763  }
764 
765  if (streaming_given)
766  {
767  values[Anum_pg_subscription_substream - 1] =
768  BoolGetDatum(streaming);
769  replaces[Anum_pg_subscription_substream - 1] = true;
770  }
771 
772  update_tuple = true;
773  break;
774  }
775 
777  {
778  bool enabled,
779  enabled_given;
780 
782  NULL, /* no "connect" */
783  &enabled_given, &enabled,
784  NULL, /* no "create_slot" */
785  NULL, NULL, /* no "slot_name" */
786  NULL, /* no "copy_data" */
787  NULL, /* no "synchronous_commit" */
788  NULL, /* no "refresh" */
789  NULL, NULL, /* no "binary" */
790  NULL, NULL); /* no streaming */
791  Assert(enabled_given);
792 
793  if (!sub->slotname && enabled)
794  ereport(ERROR,
795  (errcode(ERRCODE_SYNTAX_ERROR),
796  errmsg("cannot enable subscription that does not have a slot name")));
797 
798  values[Anum_pg_subscription_subenabled - 1] =
799  BoolGetDatum(enabled);
800  replaces[Anum_pg_subscription_subenabled - 1] = true;
801 
802  if (enabled)
804 
805  update_tuple = true;
806  break;
807  }
808 
810  /* Load the library providing us libpq calls. */
811  load_file("libpqwalreceiver", false);
812  /* Check the connection info string. */
814 
815  values[Anum_pg_subscription_subconninfo - 1] =
817  replaces[Anum_pg_subscription_subconninfo - 1] = true;
818  update_tuple = true;
819  break;
820 
822  {
823  bool copy_data;
824  bool refresh;
825 
827  NULL, /* no "connect" */
828  NULL, NULL, /* no "enabled" */
829  NULL, /* no "create_slot" */
830  NULL, NULL, /* no "slot_name" */
831  &copy_data,
832  NULL, /* no "synchronous_commit" */
833  &refresh,
834  NULL, NULL, /* no "binary" */
835  NULL, NULL); /* no "streaming" */
836  values[Anum_pg_subscription_subpublications - 1] =
838  replaces[Anum_pg_subscription_subpublications - 1] = true;
839 
840  update_tuple = true;
841 
842  /* Refresh if user asked us to. */
843  if (refresh)
844  {
845  if (!sub->enabled)
846  ereport(ERROR,
847  (errcode(ERRCODE_SYNTAX_ERROR),
848  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
849  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
850 
851  /* Make sure refresh sees the new list of publications. */
852  sub->publications = stmt->publication;
853 
854  AlterSubscription_refresh(sub, copy_data);
855  }
856 
857  break;
858  }
859 
861  {
862  bool copy_data;
863 
864  if (!sub->enabled)
865  ereport(ERROR,
866  (errcode(ERRCODE_SYNTAX_ERROR),
867  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
868 
870  NULL, /* no "connect" */
871  NULL, NULL, /* no "enabled" */
872  NULL, /* no "create_slot" */
873  NULL, NULL, /* no "slot_name" */
874  &copy_data,
875  NULL, /* no "synchronous_commit" */
876  NULL, /* no "refresh" */
877  NULL, NULL, /* no "binary" */
878  NULL, NULL); /* no "streaming" */
879 
880  AlterSubscription_refresh(sub, copy_data);
881 
882  break;
883  }
884 
885  default:
886  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
887  stmt->kind);
888  }
889 
890  /* Update the catalog if needed. */
891  if (update_tuple)
892  {
893  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
894  replaces);
895 
896  CatalogTupleUpdate(rel, &tup->t_self, tup);
897 
898  heap_freetuple(tup);
899  }
900 
902 
903  ObjectAddressSet(myself, SubscriptionRelationId, subid);
904 
905  InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
906 
907  return myself;
908 }
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
#define RelationGetDescr(relation)
Definition: rel.h:482
Oid GetUserId(void)
Definition: miscinit.c:476
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:400
int errcode(int sqlerrcode)
Definition: elog.c:610
static Datum publicationListToArray(List *publist)
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, bool *streaming_given, bool *streaming)
AlterSubscriptionType kind
Definition: parsenodes.h:3561
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
List * publications
int synchronous_commit
Definition: xact.c:83
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1017
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:745
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:165
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:86
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:933
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5251

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1156 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

1157 {
1158  Oid subid;
1159  HeapTuple tup;
1160  Relation rel;
1161  ObjectAddress address;
1162  Form_pg_subscription form;
1163 
1164  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1165 
1168 
1169  if (!HeapTupleIsValid(tup))
1170  ereport(ERROR,
1171  (errcode(ERRCODE_UNDEFINED_OBJECT),
1172  errmsg("subscription \"%s\" does not exist", name)));
1173 
1174  form = (Form_pg_subscription) GETSTRUCT(tup);
1175  subid = form->oid;
1176 
1177  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1178 
1179  ObjectAddressSet(address, SubscriptionRelationId, subid);
1180 
1181  heap_freetuple(tup);
1182 
1184 
1185  return address;
1186 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
int errcode(int sqlerrcode)
Definition: elog.c:610
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
Oid MyDatabaseId
Definition: globals.c:85
#define ereport(elevel,...)
Definition: elog.h:144
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
const char * name
Definition: encode.c:561
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1192 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, table_close(), and table_open().

Referenced by shdepReassignOwned().

1193 {
1194  HeapTuple tup;
1195  Relation rel;
1196 
1197  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1198 
1200 
1201  if (!HeapTupleIsValid(tup))
1202  ereport(ERROR,
1203  (errcode(ERRCODE_UNDEFINED_OBJECT),
1204  errmsg("subscription with OID %u does not exist", subid)));
1205 
1206  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1207 
1208  heap_freetuple(tup);
1209 
1211 }
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errcode(int sqlerrcode)
Definition: elog.c:610
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel,...)
Definition: elog.h:144
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
int errmsg(const char *fmt,...)
Definition: elog.c:824
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39

◆ CreateSubscription()

ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 343 of file subscriptioncmds.c.

References AccessShareLock, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum, CatalogTupleInsert(), CheckSubscriptionRelkind(), connect, CreateSubscriptionStmt::conninfo, create_slot, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, elog, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), heap_form_tuple(), heap_freetuple(), InvalidXLogRecPtr, InvokeObjectPostCreateHook, lfirst, load_file(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, PreventInTransactionBlock(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, replorigin_create(), RowExclusiveLock, RangeVar::schemaname, snprintf, CreateSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionObjectIndexId, superuser(), synchronous_commit, table_close(), table_open(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

344 {
345  Relation rel;
346  ObjectAddress myself;
347  Oid subid;
348  bool nulls[Natts_pg_subscription];
349  Datum values[Natts_pg_subscription];
350  Oid owner = GetUserId();
351  HeapTuple tup;
352  bool connect;
353  bool enabled_given;
354  bool enabled;
355  bool copy_data;
356  bool streaming;
357  bool streaming_given;
358  char *synchronous_commit;
359  char *conninfo;
360  char *slotname;
361  bool slotname_given;
362  bool binary;
363  bool binary_given;
364  char originname[NAMEDATALEN];
365  bool create_slot;
366  List *publications;
367 
368  /*
369  * Parse and check options.
370  *
371  * Connection and publication should not be specified here.
372  */
374  &connect,
375  &enabled_given, &enabled,
376  &create_slot,
377  &slotname_given, &slotname,
378  &copy_data,
379  &synchronous_commit,
380  NULL, /* no "refresh" */
381  &binary_given, &binary,
382  &streaming_given, &streaming);
383 
384  /*
385  * Since creating a replication slot is not transactional, rolling back
386  * the transaction leaves the created replication slot. So we cannot run
387  * CREATE SUBSCRIPTION inside a transaction block if creating a
388  * replication slot.
389  */
390  if (create_slot)
391  PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
392 
393  if (!superuser())
394  ereport(ERROR,
395  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
396  errmsg("must be superuser to create subscriptions")));
397 
398  /*
399  * If built with appropriate switch, whine when regression-testing
400  * conventions for subscription names are violated.
401  */
402 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
403  if (strncmp(stmt->subname, "regress_", 8) != 0)
404  elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
405 #endif
406 
407  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
408 
409  /* Check if name is used */
410  subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
412  if (OidIsValid(subid))
413  {
414  ereport(ERROR,
416  errmsg("subscription \"%s\" already exists",
417  stmt->subname)));
418  }
419 
420  if (!slotname_given && slotname == NULL)
421  slotname = stmt->subname;
422 
423  /* The default for synchronous_commit of subscriptions is off. */
424  if (synchronous_commit == NULL)
425  synchronous_commit = "off";
426 
427  conninfo = stmt->conninfo;
428  publications = stmt->publication;
429 
430  /* Load the library providing us libpq calls. */
431  load_file("libpqwalreceiver", false);
432 
433  /* Check the connection info string. */
434  walrcv_check_conninfo(conninfo);
435 
436  /* Everything ok, form a new tuple. */
437  memset(values, 0, sizeof(values));
438  memset(nulls, false, sizeof(nulls));
439 
441  Anum_pg_subscription_oid);
442  values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
443  values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
444  values[Anum_pg_subscription_subname - 1] =
446  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
447  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
448  values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
449  values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
450  values[Anum_pg_subscription_subconninfo - 1] =
451  CStringGetTextDatum(conninfo);
452  if (slotname)
453  values[Anum_pg_subscription_subslotname - 1] =
455  else
456  nulls[Anum_pg_subscription_subslotname - 1] = true;
457  values[Anum_pg_subscription_subsynccommit - 1] =
458  CStringGetTextDatum(synchronous_commit);
459  values[Anum_pg_subscription_subpublications - 1] =
460  publicationListToArray(publications);
461 
462  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
463 
464  /* Insert tuple into catalog. */
465  CatalogTupleInsert(rel, tup);
466  heap_freetuple(tup);
467 
468  recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
469 
470  snprintf(originname, sizeof(originname), "pg_%u", subid);
471  replorigin_create(originname);
472 
473  /*
474  * Connect to remote side to execute requested commands and fetch table
475  * info.
476  */
477  if (connect)
478  {
479  char *err;
481  List *tables;
482  ListCell *lc;
483  char table_state;
484 
485  /* Try to connect to the publisher. */
486  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
487  if (!wrconn)
488  ereport(ERROR,
489  (errmsg("could not connect to the publisher: %s", err)));
490 
491  PG_TRY();
492  {
493  /*
494  * Set sync state based on if we were asked to do data copy or
495  * not.
496  */
497  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
498 
499  /*
500  * Get the table list from publisher and build local table status
501  * info.
502  */
503  tables = fetch_table_list(wrconn, publications);
504  foreach(lc, tables)
505  {
506  RangeVar *rv = (RangeVar *) lfirst(lc);
507  Oid relid;
508 
509  relid = RangeVarGetRelid(rv, AccessShareLock, false);
510 
511  /* Check for supported relkind. */
513  rv->schemaname, rv->relname);
514 
515  AddSubscriptionRelState(subid, relid, table_state,
517  }
518 
519  /*
520  * If requested, create permanent slot for the subscription. We
521  * won't use the initial snapshot for anything, so no need to
522  * export it.
523  */
524  if (create_slot)
525  {
526  Assert(slotname);
527 
528  walrcv_create_slot(wrconn, slotname, false,
529  CRS_NOEXPORT_SNAPSHOT, NULL);
530  ereport(NOTICE,
531  (errmsg("created replication slot \"%s\" on publisher",
532  slotname)));
533  }
534  }
535  PG_FINALLY();
536  {
537  walrcv_disconnect(wrconn);
538  }
539  PG_END_TRY();
540  }
541  else
543  /* translator: %s is an SQL ALTER statement */
544  (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
545  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
546 
548 
549  if (enabled)
551 
552  ObjectAddressSet(myself, SubscriptionRelationId, subid);
553 
554  InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
555 
556  return myself;
557 }
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:317
WalReceiverConn * wrconn
Definition: worker.c:161
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
#define RelationGetDescr(relation)
Definition: rel.h:482
Oid GetUserId(void)
Definition: miscinit.c:476
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:78
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:400
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1915
#define AccessShareLock
Definition: lockdefs.h:36
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define connect(s, name, namelen)
Definition: win32_port.h:435
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:420
#define OidIsValid(objectId)
Definition: c.h:651
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
char * relname
Definition: primnodes.h:68
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, bool *streaming_given, bool *streaming)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
int synchronous_commit
Definition: xact.c:83
#define WARNING
Definition: elog.h:40
#define PG_FINALLY()
Definition: elog.h:312
uintptr_t Datum
Definition: postgres.h:367
Oid MyDatabaseId
Definition: globals.c:85
#define BoolGetDatum(X)
Definition: postgres.h:402
#define ereport(elevel,...)
Definition: elog.h:144
#define NOTICE
Definition: elog.h:37
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
#define walrcv_disconnect(conn)
Definition: walreceiver.h:426
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:165
int errmsg(const char *fmt,...)
Definition: elog.c:824
#define elog(elevel,...)
Definition: elog.h:214
#define CStringGetTextDatum(s)
Definition: builtins.h:86
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:933
#define PG_TRY()
Definition: elog.h:295
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
#define PG_END_TRY()
Definition: elog.h:320
#define SubscriptionObjectIndexId
Definition: indexing.h:369
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:398

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 914 of file subscriptioncmds.c.

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), ereport, WalRcvExecResult::err, errcode(), errdetail(), errhint(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GETSTRUCT, GetUserId(), HeapTupleIsValid, initStringInfo(), InvalidOid, InvalidRepOriginId, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, OBJECT_SUBSCRIPTION, ObjectAddressSet, pfree(), PG_END_TRY, PG_FINALLY, pg_subscription_ownercheck(), PG_TRY, PreventInTransactionBlock(), pstrdup(), quote_identifier(), ReleaseSysCache(), LogicalRepWorker::relid, RemoveSubscriptionRel(), replorigin_by_name(), replorigin_drop(), SearchSysCache2(), snprintf, WalRcvExecResult::status, LogicalRepWorker::subid, subname, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SysCacheGetAttr(), HeapTupleData::t_self, table_close(), table_open(), TextDatumGetCString, walrcv_clear_result(), walrcv_connect, walrcv_disconnect, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by ProcessUtilitySlow().

915 {
916  Relation rel;
917  ObjectAddress myself;
918  HeapTuple tup;
919  Oid subid;
920  Datum datum;
921  bool isnull;
922  char *subname;
923  char *conninfo;
924  char *slotname;
925  List *subworkers;
926  ListCell *lc;
927  char originname[NAMEDATALEN];
928  char *err = NULL;
929  RepOriginId originid;
930  WalReceiverConn *wrconn = NULL;
931  StringInfoData cmd;
933 
934  /*
935  * Lock pg_subscription with AccessExclusiveLock to ensure that the
936  * launcher doesn't restart new worker during dropping the subscription
937  */
938  rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
939 
941  CStringGetDatum(stmt->subname));
942 
943  if (!HeapTupleIsValid(tup))
944  {
945  table_close(rel, NoLock);
946 
947  if (!stmt->missing_ok)
948  ereport(ERROR,
949  (errcode(ERRCODE_UNDEFINED_OBJECT),
950  errmsg("subscription \"%s\" does not exist",
951  stmt->subname)));
952  else
953  ereport(NOTICE,
954  (errmsg("subscription \"%s\" does not exist, skipping",
955  stmt->subname)));
956 
957  return;
958  }
959 
960  form = (Form_pg_subscription) GETSTRUCT(tup);
961  subid = form->oid;
962 
963  /* must be owner */
964  if (!pg_subscription_ownercheck(subid, GetUserId()))
966  stmt->subname);
967 
968  /* DROP hook for the subscription being removed */
969  InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
970 
971  /*
972  * Lock the subscription so nobody else can do anything with it (including
973  * the replication workers).
974  */
975  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
976 
977  /* Get subname */
978  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
979  Anum_pg_subscription_subname, &isnull);
980  Assert(!isnull);
981  subname = pstrdup(NameStr(*DatumGetName(datum)));
982 
983  /* Get conninfo */
984  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
985  Anum_pg_subscription_subconninfo, &isnull);
986  Assert(!isnull);
987  conninfo = TextDatumGetCString(datum);
988 
989  /* Get slotname */
990  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
991  Anum_pg_subscription_subslotname, &isnull);
992  if (!isnull)
993  slotname = pstrdup(NameStr(*DatumGetName(datum)));
994  else
995  slotname = NULL;
996 
997  /*
998  * Since dropping a replication slot is not transactional, the replication
999  * slot stays dropped even if the transaction rolls back. So we cannot
1000  * run DROP SUBSCRIPTION inside a transaction block if dropping the
1001  * replication slot.
1002  *
1003  * XXX The command name should really be something like "DROP SUBSCRIPTION
1004  * of a subscription that is associated with a replication slot", but we
1005  * don't have the proper facilities for that.
1006  */
1007  if (slotname)
1008  PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1009 
1010  ObjectAddressSet(myself, SubscriptionRelationId, subid);
1011  EventTriggerSQLDropAddObject(&myself, true, true);
1012 
1013  /* Remove the tuple from catalog. */
1014  CatalogTupleDelete(rel, &tup->t_self);
1015 
1016  ReleaseSysCache(tup);
1017 
1018  /*
1019  * Stop all the subscription workers immediately.
1020  *
1021  * This is necessary if we are dropping the replication slot, so that the
1022  * slot becomes accessible.
1023  *
1024  * It is also necessary if the subscription is disabled and was disabled
1025  * in the same transaction. Then the workers haven't seen the disabling
1026  * yet and will still be running, leading to hangs later when we want to
1027  * drop the replication origin. If the subscription was disabled before
1028  * this transaction, then there shouldn't be any workers left, so this
1029  * won't make a difference.
1030  *
1031  * New workers won't be started because we hold an exclusive lock on the
1032  * subscription till the end of the transaction.
1033  */
1034  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1035  subworkers = logicalrep_workers_find(subid, false);
1036  LWLockRelease(LogicalRepWorkerLock);
1037  foreach(lc, subworkers)
1038  {
1040 
1042  }
1043  list_free(subworkers);
1044 
1045  /* Clean up dependencies */
1046  deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1047 
1048  /* Remove any associated relation synchronization states. */
1050 
1051  /* Remove the origin tracking if exists. */
1052  snprintf(originname, sizeof(originname), "pg_%u", subid);
1053  originid = replorigin_by_name(originname, true);
1054  if (originid != InvalidRepOriginId)
1055  replorigin_drop(originid, false);
1056 
1057  /*
1058  * If there is no slot associated with the subscription, we can finish
1059  * here.
1060  */
1061  if (!slotname)
1062  {
1063  table_close(rel, NoLock);
1064  return;
1065  }
1066 
1067  /*
1068  * Otherwise drop the replication slot at the publisher node using the
1069  * replication connection.
1070  */
1071  load_file("libpqwalreceiver", false);
1072 
1073  initStringInfo(&cmd);
1074  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1075 
1076  wrconn = walrcv_connect(conninfo, true, subname, &err);
1077  if (wrconn == NULL)
1078  ereport(ERROR,
1079  (errmsg("could not connect to publisher when attempting to "
1080  "drop the replication slot \"%s\"", slotname),
1081  errdetail("The error was: %s", err),
1082  /* translator: %s is an SQL ALTER command */
1083  errhint("Use %s to disassociate the subscription from the slot.",
1084  "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
1085 
1086  PG_TRY();
1087  {
1088  WalRcvExecResult *res;
1089 
1090  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1091 
1092  if (res->status != WALRCV_OK_COMMAND)
1093  ereport(ERROR,
1094  (errmsg("could not drop the replication slot \"%s\" on publisher",
1095  slotname),
1096  errdetail("The error was: %s", res->err)));
1097  else
1098  ereport(NOTICE,
1099  (errmsg("dropped replication slot \"%s\" on publisher",
1100  slotname)));
1101 
1102  walrcv_clear_result(res);
1103  }
1104  PG_FINALLY();
1105  {
1106  walrcv_disconnect(wrconn);
1107  }
1108  PG_END_TRY();
1109 
1110  pfree(cmd.data);
1111 
1112  table_close(rel, NoLock);
1113 }
WalReceiverConn * wrconn
Definition: worker.c:161
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:263
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errhint(const char *fmt,...)
Definition: elog.c:1071
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10706
Oid GetUserId(void)
Definition: miscinit.c:476
char * pstrdup(const char *in)
Definition: mcxt.c:1187
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:332
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:160
uint16 RepOriginId
Definition: xlogdefs.h:58
int errcode(int sqlerrcode)
Definition: elog.c:610
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
FormData_pg_subscription * Form_pg_subscription
unsigned int Oid
Definition: postgres_ext.h:31
NameData subname
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:585
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:430
void pfree(void *pointer)
Definition: mcxt.c:1057
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
#define NoLock
Definition: lockdefs.h:34
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:456
int errdetail(const char *fmt,...)
Definition: elog.c:957
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:945
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define PG_FINALLY()
Definition: elog.h:312
#define TextDatumGetCString(d)
Definition: builtins.h:87
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1017
#define InvalidOid
Definition: postgres_ext.h:36
#define ereport(elevel,...)
Definition: elog.h:144
#define NOTICE
Definition: elog.h:37
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:745
#define lfirst(lc)
Definition: pg_list.h:190
WalRcvExecStatus status
Definition: walreceiver.h:212
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
#define walrcv_disconnect(conn)
Definition: walreceiver.h:426
#define InvalidRepOriginId
Definition: origin.h:33
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:45
int errmsg(const char *fmt,...)
Definition: elog.c:824
void list_free(List *list)
Definition: list.c:1376
#define NameStr(name)
Definition: c.h:622
#define PG_TRY()
Definition: elog.h:295
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
Definition: pg_list.h:50
#define snprintf
Definition: port.h:193
#define PG_END_TRY()
Definition: elog.h:320
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:424
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5251
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:398