PostgreSQL Source Code  git master
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscription (ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 
char defGetStreamingMode (DefElem *def)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( ParseState pstate,
AlterSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 1004 of file subscriptioncmds.c.

1006 {
1007  Relation rel;
1008  ObjectAddress myself;
1009  bool nulls[Natts_pg_subscription];
1010  bool replaces[Natts_pg_subscription];
1011  Datum values[Natts_pg_subscription];
1012  HeapTuple tup;
1013  Oid subid;
1014  bool update_tuple = false;
1015  Subscription *sub;
1016  Form_pg_subscription form;
1017  bits32 supported_opts;
1018  SubOpts opts = {0};
1019 
1020  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1021 
1022  /* Fetch the existing tuple. */
1024  CStringGetDatum(stmt->subname));
1025 
1026  if (!HeapTupleIsValid(tup))
1027  ereport(ERROR,
1028  (errcode(ERRCODE_UNDEFINED_OBJECT),
1029  errmsg("subscription \"%s\" does not exist",
1030  stmt->subname)));
1031 
1032  form = (Form_pg_subscription) GETSTRUCT(tup);
1033  subid = form->oid;
1034 
1035  /* must be owner */
1036  if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1038  stmt->subname);
1039 
1040  sub = GetSubscription(subid, false);
1041 
1042  /* Lock the subscription so nobody else can do anything with it. */
1043  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1044 
1045  /* Form a new tuple. */
1046  memset(values, 0, sizeof(values));
1047  memset(nulls, false, sizeof(nulls));
1048  memset(replaces, false, sizeof(replaces));
1049 
1050  switch (stmt->kind)
1051  {
1053  {
1054  supported_opts = (SUBOPT_SLOT_NAME |
1057  SUBOPT_ORIGIN);
1058 
1059  parse_subscription_options(pstate, stmt->options,
1060  supported_opts, &opts);
1061 
1062  if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1063  {
1064  /*
1065  * The subscription must be disabled to allow slot_name as
1066  * 'none', otherwise, the apply worker will repeatedly try
1067  * to stream the data using that slot_name which neither
1068  * exists on the publisher nor the user will be allowed to
1069  * create it.
1070  */
1071  if (sub->enabled && !opts.slot_name)
1072  ereport(ERROR,
1073  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1074  errmsg("cannot set %s for enabled subscription",
1075  "slot_name = NONE")));
1076 
1077  if (opts.slot_name)
1078  values[Anum_pg_subscription_subslotname - 1] =
1080  else
1081  nulls[Anum_pg_subscription_subslotname - 1] = true;
1082  replaces[Anum_pg_subscription_subslotname - 1] = true;
1083  }
1084 
1085  if (opts.synchronous_commit)
1086  {
1087  values[Anum_pg_subscription_subsynccommit - 1] =
1088  CStringGetTextDatum(opts.synchronous_commit);
1089  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
1090  }
1091 
1092  if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1093  {
1094  values[Anum_pg_subscription_subbinary - 1] =
1095  BoolGetDatum(opts.binary);
1096  replaces[Anum_pg_subscription_subbinary - 1] = true;
1097  }
1098 
1099  if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1100  {
1101  values[Anum_pg_subscription_substream - 1] =
1102  CharGetDatum(opts.streaming);
1103  replaces[Anum_pg_subscription_substream - 1] = true;
1104  }
1105 
1106  if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1107  {
1108  values[Anum_pg_subscription_subdisableonerr - 1]
1109  = BoolGetDatum(opts.disableonerr);
1110  replaces[Anum_pg_subscription_subdisableonerr - 1]
1111  = true;
1112  }
1113 
1114  if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1115  {
1116  values[Anum_pg_subscription_suborigin - 1] =
1117  CStringGetTextDatum(opts.origin);
1118  replaces[Anum_pg_subscription_suborigin - 1] = true;
1119  }
1120 
1121  update_tuple = true;
1122  break;
1123  }
1124 
1126  {
1127  parse_subscription_options(pstate, stmt->options,
1128  SUBOPT_ENABLED, &opts);
1129  Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1130 
1131  if (!sub->slotname && opts.enabled)
1132  ereport(ERROR,
1133  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1134  errmsg("cannot enable subscription that does not have a slot name")));
1135 
1136  values[Anum_pg_subscription_subenabled - 1] =
1137  BoolGetDatum(opts.enabled);
1138  replaces[Anum_pg_subscription_subenabled - 1] = true;
1139 
1140  if (opts.enabled)
1142 
1143  update_tuple = true;
1144  break;
1145  }
1146 
1148  /* Load the library providing us libpq calls. */
1149  load_file("libpqwalreceiver", false);
1150  /* Check the connection info string. */
1152 
1153  values[Anum_pg_subscription_subconninfo - 1] =
1155  replaces[Anum_pg_subscription_subconninfo - 1] = true;
1156  update_tuple = true;
1157  break;
1158 
1160  {
1161  supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
1162  parse_subscription_options(pstate, stmt->options,
1163  supported_opts, &opts);
1164 
1165  values[Anum_pg_subscription_subpublications - 1] =
1167  replaces[Anum_pg_subscription_subpublications - 1] = true;
1168 
1169  update_tuple = true;
1170 
1171  /* Refresh if user asked us to. */
1172  if (opts.refresh)
1173  {
1174  if (!sub->enabled)
1175  ereport(ERROR,
1176  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1177  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1178  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1179 
1180  /*
1181  * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1182  * not allowed.
1183  */
1184  if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1185  ereport(ERROR,
1186  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1187  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1188  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1189 
1190  PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1191 
1192  /* Make sure refresh sees the new list of publications. */
1193  sub->publications = stmt->publication;
1194 
1195  AlterSubscription_refresh(sub, opts.copy_data,
1196  stmt->publication);
1197  }
1198 
1199  break;
1200  }
1201 
1204  {
1205  List *publist;
1206  bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
1207 
1208  supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
1209  parse_subscription_options(pstate, stmt->options,
1210  supported_opts, &opts);
1211 
1212  publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1213  values[Anum_pg_subscription_subpublications - 1] =
1214  publicationListToArray(publist);
1215  replaces[Anum_pg_subscription_subpublications - 1] = true;
1216 
1217  update_tuple = true;
1218 
1219  /* Refresh if user asked us to. */
1220  if (opts.refresh)
1221  {
1222  /* We only need to validate user specified publications. */
1223  List *validate_publications = (isadd) ? stmt->publication : NULL;
1224 
1225  if (!sub->enabled)
1226  ereport(ERROR,
1227  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1228  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1229  /* translator: %s is an SQL ALTER command */
1230  errhint("Use %s instead.",
1231  isadd ?
1232  "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1233  "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1234 
1235  /*
1236  * See ALTER_SUBSCRIPTION_REFRESH for details why this is
1237  * not allowed.
1238  */
1239  if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1240  ereport(ERROR,
1241  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1242  errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1243  /* translator: %s is an SQL ALTER command */
1244  errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1245  isadd ?
1246  "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1247  "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1248 
1249  PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1250 
1251  /* Refresh the new list of publications. */
1252  sub->publications = publist;
1253 
1254  AlterSubscription_refresh(sub, opts.copy_data,
1255  validate_publications);
1256  }
1257 
1258  break;
1259  }
1260 
1262  {
1263  if (!sub->enabled)
1264  ereport(ERROR,
1265  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1266  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1267 
1268  parse_subscription_options(pstate, stmt->options,
1270 
1271  /*
1272  * The subscription option "two_phase" requires that
1273  * replication has passed the initial table synchronization
1274  * phase before the two_phase becomes properly enabled.
1275  *
1276  * But, having reached this two-phase commit "enabled" state
1277  * we must not allow any subsequent table initialization to
1278  * occur. So the ALTER SUBSCRIPTION ... REFRESH is disallowed
1279  * when the user had requested two_phase = on mode.
1280  *
1281  * The exception to this restriction is when copy_data =
1282  * false, because when copy_data is false the tablesync will
1283  * start already in READY state and will exit directly without
1284  * doing anything.
1285  *
1286  * For more details see comments atop worker.c.
1287  */
1288  if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1289  ereport(ERROR,
1290  (errcode(ERRCODE_SYNTAX_ERROR),
1291  errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1292  errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1293 
1294  PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
1295 
1296  AlterSubscription_refresh(sub, opts.copy_data, NULL);
1297 
1298  break;
1299  }
1300 
1302  {
1304 
1305  /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1306  Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1307 
1308  if (!superuser())
1309  ereport(ERROR,
1310  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1311  errmsg("must be superuser to skip transaction")));
1312 
1313  /*
1314  * If the user sets subskiplsn, we do a sanity check to make
1315  * sure that the specified LSN is a probable value.
1316  */
1317  if (!XLogRecPtrIsInvalid(opts.lsn))
1318  {
1319  RepOriginId originid;
1320  char originname[NAMEDATALEN];
1321  XLogRecPtr remote_lsn;
1322 
1324  originname, sizeof(originname));
1325  originid = replorigin_by_name(originname, false);
1326  remote_lsn = replorigin_get_progress(originid, false);
1327 
1328  /* Check the given LSN is at least a future LSN */
1329  if (!XLogRecPtrIsInvalid(remote_lsn) && opts.lsn < remote_lsn)
1330  ereport(ERROR,
1331  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1332  errmsg("skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1333  LSN_FORMAT_ARGS(opts.lsn),
1334  LSN_FORMAT_ARGS(remote_lsn))));
1335  }
1336 
1337  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(opts.lsn);
1338  replaces[Anum_pg_subscription_subskiplsn - 1] = true;
1339 
1340  update_tuple = true;
1341  break;
1342  }
1343 
1344  default:
1345  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1346  stmt->kind);
1347  }
1348 
1349  /* Update the catalog if needed. */
1350  if (update_tuple)
1351  {
1352  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1353  replaces);
1354 
1355  CatalogTupleUpdate(rel, &tup->t_self, tup);
1356 
1357  heap_freetuple(tup);
1358  }
1359 
1361 
1362  ObjectAddressSet(myself, SubscriptionRelationId, subid);
1363 
1364  InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
1365 
1366  /* Wake up related replication workers to handle this change quickly. */
1368 
1369  return myself;
1370 }
@ ACLCHECK_NOT_OWNER
Definition: acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:2679
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition: aclchk.c:3984
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition: worker.c:4958
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition: worker.c:454
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define CStringGetTextDatum(s)
Definition: builtins.h:94
uint32 bits32
Definition: c.h:499
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:144
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:642
Oid MyDatabaseId
Definition: globals.c:89
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:1061
Assert(fmt[strlen(fmt) - 1] !='\n')
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define AccessExclusiveLock
Definition: lockdefs.h:43
#define RowExclusiveLock
Definition: lockdefs.h:38
Oid GetUserId(void)
Definition: miscinit.c:502
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:197
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition: origin.c:221
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition: origin.c:1014
@ ALTER_SUBSCRIPTION_ENABLED
Definition: parsenodes.h:3913
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
Definition: parsenodes.h:3911
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
Definition: parsenodes.h:3909
@ ALTER_SUBSCRIPTION_REFRESH
Definition: parsenodes.h:3912
@ ALTER_SUBSCRIPTION_SKIP
Definition: parsenodes.h:3914
@ ALTER_SUBSCRIPTION_OPTIONS
Definition: parsenodes.h:3907
@ ALTER_SUBSCRIPTION_CONNECTION
Definition: parsenodes.h:3908
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
Definition: parsenodes.h:3910
@ OBJECT_SUBSCRIPTION
Definition: parsenodes.h:2007
static AmcheckOptions opts
Definition: pg_amcheck.c:110
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
Subscription * GetSubscription(Oid subid, bool missing_ok)
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum CStringGetDatum(const char *X)
Definition: postgres.h:350
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetDescr(relation)
Definition: rel.h:527
AlterSubscriptionType kind
Definition: parsenodes.h:3920
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
#define SUBOPT_STREAMING
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
#define SUBOPT_SLOT_NAME
#define SUBOPT_COPY_DATA
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
#define SUBOPT_DISABLE_ON_ERR
#define SUBOPT_LSN
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
bool superuser(void)
Definition: superuser.c:46
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:181
@ SUBSCRIPTIONNAME
Definition: syscache.h:98
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:410
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3488
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29
uint16 RepOriginId
Definition: xlogdefs.h:65
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_REFRESH, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SKIP, AlterSubscription_refresh(), ApplyLauncherWakeupAtCommit(), Assert(), BoolGetDatum(), CatalogTupleUpdate(), CharGetDatum(), AlterSubscriptionStmt::conninfo, CStringGetDatum(), CStringGetTextDatum, DirectFunctionCall1, elog(), Subscription::enabled, ereport, errcode(), errhint(), errmsg(), ERROR, GETSTRUCT, GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, if(), InvalidOid, InvokeObjectPostAlterHook, IsSet, AlterSubscriptionStmt::kind, load_file(), LockSharedObject(), LOGICALREP_TWOPHASE_STATE_ENABLED, LogicalRepWorkersWakeupAtCommit(), LSN_FORMAT_ARGS, LSNGetDatum(), merge_publications(), MyDatabaseId, NAMEDATALEN, namein(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, AlterSubscriptionStmt::options, opts, parse_subscription_options(), PreventInTransactionBlock(), AlterSubscriptionStmt::publication, publicationListToArray(), Subscription::publications, RelationGetDescr, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_get_progress(), RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, AlterSubscriptionStmt::subname, SUBOPT_BINARY, SUBOPT_COPY_DATA, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_LSN, SUBOPT_ORIGIN, SUBOPT_REFRESH, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBSCRIPTIONNAME, superuser(), HeapTupleData::t_self, table_close(), table_open(), Subscription::twophasestate, values, walrcv_check_conninfo, and XLogRecPtrIsInvalid.

Referenced by ProcessUtilitySlow().

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 1758 of file subscriptioncmds.c.

1759 {
1760  Oid subid;
1761  HeapTuple tup;
1762  Relation rel;
1763  ObjectAddress address;
1764  Form_pg_subscription form;
1765 
1766  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1767 
1770 
1771  if (!HeapTupleIsValid(tup))
1772  ereport(ERROR,
1773  (errcode(ERRCODE_UNDEFINED_OBJECT),
1774  errmsg("subscription \"%s\" does not exist", name)));
1775 
1776  form = (Form_pg_subscription) GETSTRUCT(tup);
1777  subid = form->oid;
1778 
1779  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1780 
1781  ObjectAddressSet(address, SubscriptionRelationId, subid);
1782 
1783  heap_freetuple(tup);
1784 
1786 
1787  return address;
1788 }
const char * name
Definition: encode.c:571
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)

References AlterSubscriptionOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg(), ERROR, GETSTRUCT, heap_freetuple(), HeapTupleIsValid, MyDatabaseId, name, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 1794 of file subscriptioncmds.c.

1795 {
1796  HeapTuple tup;
1797  Relation rel;
1798 
1799  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1800 
1802 
1803  if (!HeapTupleIsValid(tup))
1804  ereport(ERROR,
1805  (errcode(ERRCODE_UNDEFINED_OBJECT),
1806  errmsg("subscription with OID %u does not exist", subid)));
1807 
1808  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1809 
1810  heap_freetuple(tup);
1811 
1813 }
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONOID
Definition: syscache.h:99

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, table_close(), and table_open().

Referenced by shdepReassignOwned().

◆ CreateSubscription()

ObjectAddress CreateSubscription ( ParseState pstate,
CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 538 of file subscriptioncmds.c.

540 {
541  Relation rel;
542  ObjectAddress myself;
543  Oid subid;
544  bool nulls[Natts_pg_subscription];
545  Datum values[Natts_pg_subscription];
546  Oid owner = GetUserId();
547  HeapTuple tup;
548  char *conninfo;
549  char originname[NAMEDATALEN];
550  List *publications;
551  bits32 supported_opts;
552  SubOpts opts = {0};
553 
554  /*
555  * Parse and check options.
556  *
557  * Connection and publication should not be specified here.
558  */
559  supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
564  parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
565 
566  /*
567  * Since creating a replication slot is not transactional, rolling back
568  * the transaction leaves the created replication slot. So we cannot run
569  * CREATE SUBSCRIPTION inside a transaction block if creating a
570  * replication slot.
571  */
572  if (opts.create_slot)
573  PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
574 
575  if (!superuser())
576  ereport(ERROR,
577  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
578  errmsg("must be superuser to create subscriptions")));
579 
580  /*
581  * If built with appropriate switch, whine when regression-testing
582  * conventions for subscription names are violated.
583  */
584 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
585  if (strncmp(stmt->subname, "regress_", 8) != 0)
586  elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
587 #endif
588 
589  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
590 
591  /* Check if name is used */
592  subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
594  if (OidIsValid(subid))
595  {
596  ereport(ERROR,
598  errmsg("subscription \"%s\" already exists",
599  stmt->subname)));
600  }
601 
602  if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
603  opts.slot_name == NULL)
604  opts.slot_name = stmt->subname;
605 
606  /* The default for synchronous_commit of subscriptions is off. */
607  if (opts.synchronous_commit == NULL)
608  opts.synchronous_commit = "off";
609 
610  conninfo = stmt->conninfo;
611  publications = stmt->publication;
612 
613  /* Load the library providing us libpq calls. */
614  load_file("libpqwalreceiver", false);
615 
616  /* Check the connection info string. */
617  walrcv_check_conninfo(conninfo);
618 
619  /* Everything ok, form a new tuple. */
620  memset(values, 0, sizeof(values));
621  memset(nulls, false, sizeof(nulls));
622 
623  subid = GetNewOidWithIndex(rel, SubscriptionObjectIndexId,
624  Anum_pg_subscription_oid);
625  values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
626  values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
627  values[Anum_pg_subscription_subskiplsn - 1] = LSNGetDatum(InvalidXLogRecPtr);
628  values[Anum_pg_subscription_subname - 1] =
630  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
631  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
632  values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
633  values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
634  values[Anum_pg_subscription_subtwophasestate - 1] =
635  CharGetDatum(opts.twophase ?
638  values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
639  values[Anum_pg_subscription_subconninfo - 1] =
640  CStringGetTextDatum(conninfo);
641  if (opts.slot_name)
642  values[Anum_pg_subscription_subslotname - 1] =
644  else
645  nulls[Anum_pg_subscription_subslotname - 1] = true;
646  values[Anum_pg_subscription_subsynccommit - 1] =
647  CStringGetTextDatum(opts.synchronous_commit);
648  values[Anum_pg_subscription_subpublications - 1] =
649  publicationListToArray(publications);
650  values[Anum_pg_subscription_suborigin - 1] =
651  CStringGetTextDatum(opts.origin);
652 
653  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
654 
655  /* Insert tuple into catalog. */
656  CatalogTupleInsert(rel, tup);
657  heap_freetuple(tup);
658 
659  recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
660 
661  ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
662  replorigin_create(originname);
663 
664  /*
665  * Connect to remote side to execute requested commands and fetch table
666  * info.
667  */
668  if (opts.connect)
669  {
670  char *err;
672  List *tables;
673  ListCell *lc;
674  char table_state;
675 
676  /* Try to connect to the publisher. */
677  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
678  if (!wrconn)
679  ereport(ERROR,
680  (errcode(ERRCODE_CONNECTION_FAILURE),
681  errmsg("could not connect to the publisher: %s", err)));
682 
683  PG_TRY();
684  {
685  check_publications(wrconn, publications);
686  check_publications_origin(wrconn, publications, opts.copy_data,
687  opts.origin, NULL, 0, stmt->subname);
688 
689  /*
690  * Set sync state based on if we were asked to do data copy or
691  * not.
692  */
693  table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
694 
695  /*
696  * Get the table list from publisher and build local table status
697  * info.
698  */
699  tables = fetch_table_list(wrconn, publications);
700  foreach(lc, tables)
701  {
702  RangeVar *rv = (RangeVar *) lfirst(lc);
703  Oid relid;
704 
705  relid = RangeVarGetRelid(rv, AccessShareLock, false);
706 
707  /* Check for supported relkind. */
709  rv->schemaname, rv->relname);
710 
711  AddSubscriptionRelState(subid, relid, table_state,
713  }
714 
715  /*
716  * If requested, create permanent slot for the subscription. We
717  * won't use the initial snapshot for anything, so no need to
718  * export it.
719  */
720  if (opts.create_slot)
721  {
722  bool twophase_enabled = false;
723 
724  Assert(opts.slot_name);
725 
726  /*
727  * Even if two_phase is set, don't create the slot with
728  * two-phase enabled. Will enable it once all the tables are
729  * synced and ready. This avoids race-conditions like prepared
730  * transactions being skipped due to changes not being applied
731  * due to checks in should_apply_changes_for_rel() when
732  * tablesync for the corresponding tables are in progress. See
733  * comments atop worker.c.
734  *
735  * Note that if tables were specified but copy_data is false
736  * then it is safe to enable two_phase up-front because those
737  * tables are already initially in READY state. When the
738  * subscription has no tables, we leave the twophase state as
739  * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
740  * PUBLICATION to work.
741  */
742  if (opts.twophase && !opts.copy_data && tables != NIL)
743  twophase_enabled = true;
744 
745  walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
746  CRS_NOEXPORT_SNAPSHOT, NULL);
747 
748  if (twophase_enabled)
750 
751  ereport(NOTICE,
752  (errmsg("created replication slot \"%s\" on publisher",
753  opts.slot_name)));
754  }
755  }
756  PG_FINALLY();
757  {
759  }
760  PG_END_TRY();
761  }
762  else
764  (errmsg("subscription was created, but is not connected"),
765  errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
766 
768 
770 
771  if (opts.enabled)
773 
774  ObjectAddressSet(myself, SubscriptionRelationId, subid);
775 
776  InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
777 
778  return myself;
779 }
#define OidIsValid(objectId)
Definition: c.h:759
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:393
#define PG_TRY(...)
Definition: elog.h:370
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:395
#define NOTICE
Definition: elog.h:35
#define PG_FINALLY(...)
Definition: elog.h:387
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
#define AccessShareLock
Definition: lockdefs.h:36
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1985
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:79
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
RepOriginId replorigin_create(const char *roname)
Definition: origin.c:252
#define lfirst(lc)
Definition: pg_list.h:172
#define NIL
Definition: pg_list.h:68
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:165
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
void pgstat_create_subscription(Oid subid)
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
char * relname
Definition: primnodes.h:74
char * schemaname
Definition: primnodes.h:71
#define SUBOPT_CREATE_SLOT
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_TWOPHASE_COMMIT
static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_CONNECT
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:199
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition: tablesync.c:1583
static WalReceiverConn * wrconn
Definition: walreceiver.c:95
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
Definition: walreceiver.h:430
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:408
#define walrcv_disconnect(conn)
Definition: walreceiver.h:436
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert(), BoolGetDatum(), CatalogTupleInsert(), CharGetDatum(), check_publications(), check_publications_origin(), CheckSubscriptionRelkind(), CreateSubscriptionStmt::conninfo, CRS_NOEXPORT_SNAPSHOT, CStringGetDatum(), CStringGetTextDatum, DirectFunctionCall1, elog(), ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errhint(), errmsg(), ERROR, fetch_table_list(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), heap_form_tuple(), heap_freetuple(), InvalidOid, InvalidXLogRecPtr, InvokeObjectPostCreateHook, IsSet, lfirst, load_file(), LOGICALREP_TWOPHASE_STATE_DISABLED, LOGICALREP_TWOPHASE_STATE_ENABLED, LOGICALREP_TWOPHASE_STATE_PENDING, LSNGetDatum(), MyDatabaseId, NAMEDATALEN, namein(), NIL, NOTICE, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, CreateSubscriptionStmt::options, opts, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_create_subscription(), PreventInTransactionBlock(), CreateSubscriptionStmt::publication, publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, ReplicationOriginNameForLogicalRep(), replorigin_create(), RowExclusiveLock, RangeVar::schemaname, CreateSubscriptionStmt::subname, SUBOPT_BINARY, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_ORIGIN, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBSCRIPTIONNAME, superuser(), table_close(), table_open(), UpdateTwoPhaseState(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

◆ defGetStreamingMode()

char defGetStreamingMode ( DefElem def)

Definition at line 2147 of file subscriptioncmds.c.

2148 {
2149  /*
2150  * If no parameter value given, assume "true" is meant.
2151  */
2152  if (!def->arg)
2153  return LOGICALREP_STREAM_ON;
2154 
2155  /*
2156  * Allow 0, 1, "false", "true", "off", "on" or "parallel".
2157  */
2158  switch (nodeTag(def->arg))
2159  {
2160  case T_Integer:
2161  switch (intVal(def->arg))
2162  {
2163  case 0:
2164  return LOGICALREP_STREAM_OFF;
2165  case 1:
2166  return LOGICALREP_STREAM_ON;
2167  default:
2168  /* otherwise, error out below */
2169  break;
2170  }
2171  break;
2172  default:
2173  {
2174  char *sval = defGetString(def);
2175 
2176  /*
2177  * The set of strings accepted here should match up with the
2178  * grammar's opt_boolean_or_string production.
2179  */
2180  if (pg_strcasecmp(sval, "false") == 0 ||
2181  pg_strcasecmp(sval, "off") == 0)
2182  return LOGICALREP_STREAM_OFF;
2183  if (pg_strcasecmp(sval, "true") == 0 ||
2184  pg_strcasecmp(sval, "on") == 0)
2185  return LOGICALREP_STREAM_ON;
2186  if (pg_strcasecmp(sval, "parallel") == 0)
2188  }
2189  break;
2190  }
2191 
2192  ereport(ERROR,
2193  (errcode(ERRCODE_SYNTAX_ERROR),
2194  errmsg("%s requires a Boolean value or \"parallel\"",
2195  def->defname)));
2196  return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
2197 }
char * defGetString(DefElem *def)
Definition: define.c:49
#define nodeTag(nodeptr)
Definition: nodes.h:133
#define LOGICALREP_STREAM_ON
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
char * defname
Definition: parsenodes.h:810
Node * arg
Definition: parsenodes.h:811
#define intVal(v)
Definition: value.h:79

References DefElem::arg, defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, intVal, LOGICALREP_STREAM_OFF, LOGICALREP_STREAM_ON, LOGICALREP_STREAM_PARALLEL, nodeTag, and pg_strcasecmp().

Referenced by parse_output_parameters(), and parse_subscription_options().

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 1376 of file subscriptioncmds.c.

1377 {
1378  Relation rel;
1379  ObjectAddress myself;
1380  HeapTuple tup;
1381  Oid subid;
1382  Datum datum;
1383  bool isnull;
1384  char *subname;
1385  char *conninfo;
1386  char *slotname;
1387  List *subworkers;
1388  ListCell *lc;
1389  char originname[NAMEDATALEN];
1390  char *err = NULL;
1392  Form_pg_subscription form;
1393  List *rstates;
1394 
1395  /*
1396  * Lock pg_subscription with AccessExclusiveLock to ensure that the
1397  * launcher doesn't restart new worker during dropping the subscription
1398  */
1399  rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
1400 
1402  CStringGetDatum(stmt->subname));
1403 
1404  if (!HeapTupleIsValid(tup))
1405  {
1406  table_close(rel, NoLock);
1407 
1408  if (!stmt->missing_ok)
1409  ereport(ERROR,
1410  (errcode(ERRCODE_UNDEFINED_OBJECT),
1411  errmsg("subscription \"%s\" does not exist",
1412  stmt->subname)));
1413  else
1414  ereport(NOTICE,
1415  (errmsg("subscription \"%s\" does not exist, skipping",
1416  stmt->subname)));
1417 
1418  return;
1419  }
1420 
1421  form = (Form_pg_subscription) GETSTRUCT(tup);
1422  subid = form->oid;
1423 
1424  /* must be owner */
1425  if (!object_ownercheck(SubscriptionRelationId, subid, GetUserId()))
1427  stmt->subname);
1428 
1429  /* DROP hook for the subscription being removed */
1430  InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
1431 
1432  /*
1433  * Lock the subscription so nobody else can do anything with it (including
1434  * the replication workers).
1435  */
1436  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
1437 
1438  /* Get subname */
1439  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1440  Anum_pg_subscription_subname, &isnull);
1441  Assert(!isnull);
1442  subname = pstrdup(NameStr(*DatumGetName(datum)));
1443 
1444  /* Get conninfo */
1445  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1446  Anum_pg_subscription_subconninfo, &isnull);
1447  Assert(!isnull);
1448  conninfo = TextDatumGetCString(datum);
1449 
1450  /* Get slotname */
1451  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
1452  Anum_pg_subscription_subslotname, &isnull);
1453  if (!isnull)
1454  slotname = pstrdup(NameStr(*DatumGetName(datum)));
1455  else
1456  slotname = NULL;
1457 
1458  /*
1459  * Since dropping a replication slot is not transactional, the replication
1460  * slot stays dropped even if the transaction rolls back. So we cannot
1461  * run DROP SUBSCRIPTION inside a transaction block if dropping the
1462  * replication slot. Also, in this case, we report a message for dropping
1463  * the subscription to the cumulative stats system.
1464  *
1465  * XXX The command name should really be something like "DROP SUBSCRIPTION
1466  * of a subscription that is associated with a replication slot", but we
1467  * don't have the proper facilities for that.
1468  */
1469  if (slotname)
1470  PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1471 
1472  ObjectAddressSet(myself, SubscriptionRelationId, subid);
1473  EventTriggerSQLDropAddObject(&myself, true, true);
1474 
1475  /* Remove the tuple from catalog. */
1476  CatalogTupleDelete(rel, &tup->t_self);
1477 
1478  ReleaseSysCache(tup);
1479 
1480  /*
1481  * Stop all the subscription workers immediately.
1482  *
1483  * This is necessary if we are dropping the replication slot, so that the
1484  * slot becomes accessible.
1485  *
1486  * It is also necessary if the subscription is disabled and was disabled
1487  * in the same transaction. Then the workers haven't seen the disabling
1488  * yet and will still be running, leading to hangs later when we want to
1489  * drop the replication origin. If the subscription was disabled before
1490  * this transaction, then there shouldn't be any workers left, so this
1491  * won't make a difference.
1492  *
1493  * New workers won't be started because we hold an exclusive lock on the
1494  * subscription till the end of the transaction.
1495  */
1496  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1497  subworkers = logicalrep_workers_find(subid, false);
1498  LWLockRelease(LogicalRepWorkerLock);
1499  foreach(lc, subworkers)
1500  {
1502 
1504  }
1505  list_free(subworkers);
1506 
1507  /*
1508  * Remove the no-longer-useful entry in the launcher's table of apply
1509  * worker start times.
1510  *
1511  * If this transaction rolls back, the launcher might restart a failed
1512  * apply worker before wal_retrieve_retry_interval milliseconds have
1513  * elapsed, but that's pretty harmless.
1514  */
1516 
1517  /*
1518  * Cleanup of tablesync replication origins.
1519  *
1520  * Any READY-state relations would already have dealt with clean-ups.
1521  *
1522  * Note that the state can't change because we have already stopped both
1523  * the apply and tablesync workers and they can't restart because of
1524  * exclusive lock on the subscription.
1525  */
1526  rstates = GetSubscriptionRelations(subid, true);
1527  foreach(lc, rstates)
1528  {
1530  Oid relid = rstate->relid;
1531 
1532  /* Only cleanup resources of tablesync workers */
1533  if (!OidIsValid(relid))
1534  continue;
1535 
1536  /*
1537  * Drop the tablesync's origin tracking if exists.
1538  *
1539  * It is possible that the origin is not yet created for tablesync
1540  * worker so passing missing_ok = true. This can happen for the states
1541  * before SUBREL_STATE_FINISHEDCOPY.
1542  */
1543  ReplicationOriginNameForLogicalRep(subid, relid, originname,
1544  sizeof(originname));
1545  replorigin_drop_by_name(originname, true, false);
1546  }
1547 
1548  /* Clean up dependencies */
1549  deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1550 
1551  /* Remove any associated relation synchronization states. */
1553 
1554  /* Remove the origin tracking if exists. */
1555  ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
1556  replorigin_drop_by_name(originname, true, false);
1557 
1558  /*
1559  * If there is no slot associated with the subscription, we can finish
1560  * here.
1561  */
1562  if (!slotname && rstates == NIL)
1563  {
1564  table_close(rel, NoLock);
1565  return;
1566  }
1567 
1568  /*
1569  * Try to acquire the connection necessary for dropping slots.
1570  *
1571  * Note: If the slotname is NONE/NULL then we allow the command to finish
1572  * and users need to manually cleanup the apply and tablesync worker slots
1573  * later.
1574  *
1575  * This has to be at the end because otherwise if there is an error while
1576  * doing the database operations we won't be able to rollback dropped
1577  * slot.
1578  */
1579  load_file("libpqwalreceiver", false);
1580 
1581  wrconn = walrcv_connect(conninfo, true, subname, &err);
1582  if (wrconn == NULL)
1583  {
1584  if (!slotname)
1585  {
1586  /* be tidy */
1587  list_free(rstates);
1588  table_close(rel, NoLock);
1589  return;
1590  }
1591  else
1592  {
1593  ReportSlotConnectionError(rstates, subid, slotname, err);
1594  }
1595  }
1596 
1597  PG_TRY();
1598  {
1599  foreach(lc, rstates)
1600  {
1602  Oid relid = rstate->relid;
1603 
1604  /* Only cleanup resources of tablesync workers */
1605  if (!OidIsValid(relid))
1606  continue;
1607 
1608  /*
1609  * Drop the tablesync slots associated with removed tables.
1610  *
1611  * For SYNCDONE/READY states, the tablesync slot is known to have
1612  * already been dropped by the tablesync worker.
1613  *
1614  * For other states, there is no certainty, maybe the slot does
1615  * not exist yet. Also, if we fail after removing some of the
1616  * slots, next time, it will again try to drop already dropped
1617  * slots and fail. For these reasons, we allow missing_ok = true
1618  * for the drop.
1619  */
1620  if (rstate->state != SUBREL_STATE_SYNCDONE)
1621  {
1622  char syncslotname[NAMEDATALEN] = {0};
1623 
1624  ReplicationSlotNameForTablesync(subid, relid, syncslotname,
1625  sizeof(syncslotname));
1626  ReplicationSlotDropAtPubNode(wrconn, syncslotname, true);
1627  }
1628  }
1629 
1630  list_free(rstates);
1631 
1632  /*
1633  * If there is a slot associated with the subscription, then drop the
1634  * replication slot at the publisher.
1635  */
1636  if (slotname)
1637  ReplicationSlotDropAtPubNode(wrconn, slotname, false);
1638  }
1639  PG_FINALLY();
1640  {
1642  }
1643  PG_END_TRY();
1644 
1645  /*
1646  * Tell the cumulative stats system that the subscription is getting
1647  * dropped.
1648  */
1649  pgstat_drop_subscription(subid);
1650 
1651  table_close(rel, NoLock);
1652 }
#define TextDatumGetCString(d)
Definition: builtins.h:95
#define NameStr(name)
Definition: c.h:730
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:281
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:594
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition: launcher.c:1031
void list_free(List *list)
Definition: list.c:1545
#define NoLock
Definition: lockdefs.h:34
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1195
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1803
@ LW_SHARED
Definition: lwlock.h:116
char * pstrdup(const char *in)
Definition: mcxt.c:1624
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:182
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition: origin.c:411
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:1002
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
NameData subname
void pgstat_drop_subscription(Oid subid)
static Name DatumGetName(Datum X)
Definition: postgres.h:360
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:865
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1078
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:828
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition: tablesync.c:1213

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ApplyLauncherForgetWorkerStartTime(), Assert(), CatalogTupleDelete(), CStringGetDatum(), DatumGetName(), deleteSharedDependencyRecordsFor(), ereport, errcode(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GETSTRUCT, GetSubscriptionRelations(), GetUserId(), HeapTupleIsValid, InvalidOid, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), LW_SHARED, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NIL, NoLock, NOTICE, object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, OidIsValid, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_drop_subscription(), PreventInTransactionBlock(), pstrdup(), ReleaseSysCache(), SubscriptionRelState::relid, LogicalRepWorker::relid, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), ReportSlotConnectionError(), SearchSysCache2(), SubscriptionRelState::state, LogicalRepWorker::subid, subname, DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SysCacheGetAttr(), HeapTupleData::t_self, table_close(), table_open(), TextDatumGetCString, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().