PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (CreateSubscriptionStmt *stmt)
 
ObjectAddress AlterSubscription (AlterSubscriptionStmt *stmt)
 
void DropSubscription (DropSubscriptionStmt *stmt)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 

Function Documentation

ObjectAddress AlterSubscription ( AlterSubscriptionStmt stmt)

Definition at line 338 of file subscriptioncmds.c.

References ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subenabled, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, BoolGetDatum, CatalogTupleUpdate(), CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, ereport, errcode(), errmsg(), ERROR, GetUserId(), heap_close, heap_freetuple(), heap_modify_tuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, InvokeObjectPostAlterHook, MyDatabaseId, namein(), Natts_pg_subscription, NIL, NULL, ObjectAddressSet, AlterSubscriptionStmt::options, parse_subscription_options(), pg_subscription_ownercheck(), publicationListToArray(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, AlterSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionRelationId, HeapTupleData::t_self, and values.

Referenced by ProcessUtilitySlow().

339 {
340  Relation rel;
341  ObjectAddress myself;
342  bool nulls[Natts_pg_subscription];
343  bool replaces[Natts_pg_subscription];
345  HeapTuple tup;
346  Oid subid;
347  bool enabled_given;
348  bool enabled;
349  char *conninfo;
350  char *slot_name;
351  List *publications;
352 
354 
355  /* Fetch the existing tuple. */
357  CStringGetDatum(stmt->subname));
358 
359  if (!HeapTupleIsValid(tup))
360  ereport(ERROR,
361  (errcode(ERRCODE_UNDEFINED_OBJECT),
362  errmsg("subscription \"%s\" does not exist",
363  stmt->subname)));
364 
365  /* must be owner */
368  stmt->subname);
369 
370  subid = HeapTupleGetOid(tup);
371 
372  /* Parse options. */
373  parse_subscription_options(stmt->options, &conninfo, &publications,
374  &enabled_given, &enabled,
375  NULL, &slot_name);
376 
377  /* Form a new tuple. */
378  memset(values, 0, sizeof(values));
379  memset(nulls, false, sizeof(nulls));
380  memset(replaces, false, sizeof(replaces));
381 
382  if (enabled_given)
383  {
384  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
385  replaces[Anum_pg_subscription_subenabled - 1] = true;
386  }
387  if (conninfo)
388  {
390  CStringGetTextDatum(conninfo);
391  replaces[Anum_pg_subscription_subconninfo - 1] = true;
392  }
393  if (slot_name)
394  {
397  replaces[Anum_pg_subscription_subslotname - 1] = true;
398  }
399  if (publications != NIL)
400  {
402  publicationListToArray(publications);
403  replaces[Anum_pg_subscription_subpublications - 1] = true;
404  }
405 
406  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
407  replaces);
408 
409  /* Update the catalog. */
410  CatalogTupleUpdate(rel, &tup->t_self, tup);
411 
413 
414  /* Cleanup. */
415  heap_freetuple(tup);
417 
419 
420  return myself;
421 }
#define NIL
Definition: pg_list.h:69
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
#define Anum_pg_subscription_subpublications
#define RelationGetDescr(relation)
Definition: rel.h:425
Oid GetUserId(void)
Definition: miscinit.c:283
int errcode(int sqlerrcode)
Definition: elog.c:575
static Datum publicationListToArray(List *publist)
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:555
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define Natts_pg_subscription
ItemPointerData t_self
Definition: htup.h:65
static void parse_subscription_options(List *options, char **conninfo, List **publications, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name)
#define SubscriptionRelationId
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3378
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:586
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
#define Anum_pg_subscription_subslotname
uintptr_t Datum
Definition: postgres.h:374
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
#define BoolGetDatum(X)
Definition: postgres.h:410
#define Anum_pg_subscription_subconninfo
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:226
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:90
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:160
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:793
Definition: pg_list.h:45
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5085
ObjectAddress AlterSubscriptionOwner ( const char *  name,
Oid  newOwnerId 
)

Definition at line 610 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), CStringGetDatum, ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleGetOid, HeapTupleIsValid, MyDatabaseId, ObjectAddressSet, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONNAME, and SubscriptionRelationId.

Referenced by ExecAlterOwnerStmt().

611 {
612  Oid subid;
613  HeapTuple tup;
614  Relation rel;
615  ObjectAddress address;
616 
618 
621 
622  if (!HeapTupleIsValid(tup))
623  ereport(ERROR,
624  (errcode(ERRCODE_UNDEFINED_OBJECT),
625  errmsg("subscription \"%s\" does not exist", name)));
626 
627  subid = HeapTupleGetOid(tup);
628 
629  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
630 
631  ObjectAddressSet(address, SubscriptionRelationId, subid);
632 
633  heap_freetuple(tup);
634 
636 
637  return address;
638 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:586
#define ereport(elevel, rest)
Definition: elog.h:122
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
const char * name
Definition: encode.c:521
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:160
void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 644 of file subscriptioncmds.c.

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, heap_close, heap_freetuple(), heap_open(), HeapTupleIsValid, ObjectIdGetDatum, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, and SubscriptionRelationId.

Referenced by shdepReassignOwned().

645 {
646  HeapTuple tup;
647  Relation rel;
648 
650 
652 
653  if (!HeapTupleIsValid(tup))
654  ereport(ERROR,
655  (errcode(ERRCODE_UNDEFINED_OBJECT),
656  errmsg("subscription with OID %u does not exist", subid)));
657 
658  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
659 
660  heap_freetuple(tup);
661 
663 }
int errcode(int sqlerrcode)
Definition: elog.c:575
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define ereport(elevel, rest)
Definition: elog.h:122
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:158
int errmsg(const char *fmt,...)
Definition: elog.c:797
ObjectAddress CreateSubscription ( CreateSubscriptionStmt stmt)

Definition at line 207 of file subscriptioncmds.c.

References Anum_pg_subscription_subconninfo, Anum_pg_subscription_subdbid, Anum_pg_subscription_subenabled, Anum_pg_subscription_subname, Anum_pg_subscription_subowner, Anum_pg_subscription_subpublications, Anum_pg_subscription_subslotname, ApplyLauncherWakeupAtCommit(), BoolGetDatum, CatalogTupleInsert(), CreateSubscriptionStmt::conninfo, CStringGetDatum, CStringGetTextDatum, DirectFunctionCall1, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg(), ERROR, GetSysCacheOid2, GetUserId(), heap_close, heap_form_tuple(), heap_freetuple(), heap_open(), InvokeObjectPostCreateHook, load_file(), MyDatabaseId, NAMEDATALEN, namein(), Natts_pg_subscription, NOTICE, NULL, ObjectAddressSet, ObjectIdGetDatum, OidIsValid, CreateSubscriptionStmt::options, parse_subscription_options(), PG_CATCH, PG_END_TRY, PG_RE_THROW, PG_TRY, CreateSubscriptionStmt::publication, publicationListToArray(), recordDependencyOnOwner(), RelationGetDescr, replorigin_create(), RowExclusiveLock, snprintf(), CreateSubscriptionStmt::subname, SUBSCRIPTIONNAME, SubscriptionRelationId, superuser(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().

208 {
209  Relation rel;
210  ObjectAddress myself;
211  Oid subid;
212  bool nulls[Natts_pg_subscription];
214  Oid owner = GetUserId();
215  HeapTuple tup;
216  bool enabled_given;
217  bool enabled;
218  char *conninfo;
219  char *slotname;
220  char originname[NAMEDATALEN];
221  bool create_slot;
222  List *publications;
223 
224  if (!superuser())
225  ereport(ERROR,
226  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
227  (errmsg("must be superuser to create subscriptions"))));
228 
230 
231  /* Check if name is used */
233  CStringGetDatum(stmt->subname));
234  if (OidIsValid(subid))
235  {
236  ereport(ERROR,
238  errmsg("subscription \"%s\" already exists",
239  stmt->subname)));
240  }
241 
242  /*
243  * Parse and check options.
244  * Connection and publication should not be specified here.
245  */
247  &enabled_given, &enabled,
248  &create_slot, &slotname);
249  if (slotname == NULL)
250  slotname = stmt->subname;
251 
252  conninfo = stmt->conninfo;
253  publications = stmt->publication;
254 
255  /* Load the library providing us libpq calls. */
256  load_file("libpqwalreceiver", false);
257 
258  /* Check the connection info string. */
259  walrcv_check_conninfo(conninfo);
260 
261  /* Everything ok, form a new tuple. */
262  memset(values, 0, sizeof(values));
263  memset(nulls, false, sizeof(nulls));
264 
266  values[Anum_pg_subscription_subname - 1] =
268  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
269  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
271  CStringGetTextDatum(conninfo);
275  publicationListToArray(publications);
276 
277  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
278 
279  /* Insert tuple into catalog. */
280  subid = CatalogTupleInsert(rel, tup);
281  heap_freetuple(tup);
282 
284 
285  snprintf(originname, sizeof(originname), "pg_%u", subid);
286  replorigin_create(originname);
287 
288  /*
289  * If requested, create the replication slot on remote side for our
290  * newly created subscription.
291  */
292  if (create_slot)
293  {
294  XLogRecPtr lsn;
295  char *err;
297 
298  /* Try to connect to the publisher. */
299  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
300  if (!wrconn)
301  ereport(ERROR,
302  (errmsg("could not connect to the publisher: %s", err)));
303 
304  PG_TRY();
305  {
306  walrcv_create_slot(wrconn, slotname, false, &lsn);
307  ereport(NOTICE,
308  (errmsg("created replication slot \"%s\" on publisher",
309  slotname)));
310  }
311  PG_CATCH();
312  {
313  /* Close the connection in case of failure. */
314  walrcv_disconnect(wrconn);
315  PG_RE_THROW();
316  }
317  PG_END_TRY();
318 
319  /* And we are done with the remote side. */
320  walrcv_disconnect(wrconn);
321  }
322 
324 
326 
328 
330 
331  return myself;
332 }
WalReceiverConn * wrconn
Definition: worker.c:106
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
#define Anum_pg_subscription_subpublications
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
#define Anum_pg_subscription_subowner
#define RelationGetDescr(relation)
Definition: rel.h:425
Oid GetUserId(void)
Definition: miscinit.c:283
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:211
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:555
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:158
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:534
#define GetSysCacheOid2(cacheId, key1, key2)
Definition: syscache.h:178
#define NAMEDATALEN
#define ObjectIdGetDatum(X)
Definition: postgres.h:515
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define Natts_pg_subscription
#define Anum_pg_subscription_subname
static void parse_subscription_options(List *options, char **conninfo, List **publications, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name)
#define SubscriptionRelationId
#define RowExclusiveLock
Definition: lockdefs.h:38
#define CStringGetDatum(X)
Definition: postgres.h:586
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
#define ereport(elevel, rest)
Definition: elog.h:122
#define Anum_pg_subscription_subslotname
#define Anum_pg_subscription_subdbid
uintptr_t Datum
Definition: postgres.h:374
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
#define BoolGetDatum(X)
Definition: postgres.h:410
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
#define NULL
Definition: c.h:226
uint64 XLogRecPtr
Definition: xlogdefs.h:21
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:162
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define CStringGetTextDatum(s)
Definition: builtins.h:90
#define walrcv_create_slot(conn, slotname, temporary, lsn)
Definition: walreceiver.h:227
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:541
#define PG_TRY()
Definition: elog.h:284
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:34
Definition: pg_list.h:45
#define PG_END_TRY()
Definition: elog.h:300
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:209
void DropSubscription ( DropSubscriptionStmt stmt)

Definition at line 427 of file subscriptioncmds.c.

References AccessExclusiveLock, ACL_KIND_SUBSCRIPTION, aclcheck_error(), ACLCHECK_NOT_OWNER, Anum_pg_subscription_subconninfo, Anum_pg_subscription_subname, Anum_pg_subscription_subslotname, appendStringInfo(), Assert, CatalogTupleDelete(), CStringGetDatum, StringInfoData::data, DatumGetName, deleteSharedDependencyRecordsFor(), DropSubscriptionStmt::drop_slot, ereport, errcode(), errdetail(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), GetUserId(), heap_close, heap_open(), HeapTupleGetOid, HeapTupleIsValid, initStringInfo(), InvalidRepOriginId, InvokeObjectDropHook, load_file(), LockSharedObject(), logicalrep_worker_stop(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), DropSubscriptionStmt::missing_ok, MyDatabaseId, NAMEDATALEN, NameStr, NoLock, NOTICE, NULL, ObjectAddressSet, pfree(), pg_subscription_ownercheck(), pstrdup(), ReleaseSysCache(), replorigin_by_name(), replorigin_drop(), RowExclusiveLock, SearchSysCache2, snprintf(), DropSubscriptionStmt::subname, SUBSCRIPTIONNAME, SUBSCRIPTIONOID, SubscriptionRelationId, SysCacheGetAttr(), HeapTupleData::t_self, TextDatumGetCString, walrcv_command, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().

428 {
429  Relation rel;
430  ObjectAddress myself;
431  HeapTuple tup;
432  Oid subid;
433  Datum datum;
434  bool isnull;
435  char *subname;
436  char *conninfo;
437  char *slotname;
438  char originname[NAMEDATALEN];
439  char *err = NULL;
440  RepOriginId originid;
442  StringInfoData cmd;
443 
445 
447  CStringGetDatum(stmt->subname));
448 
449  if (!HeapTupleIsValid(tup))
450  {
451  heap_close(rel, NoLock);
452 
453  if (!stmt->missing_ok)
454  ereport(ERROR,
455  (errcode(ERRCODE_UNDEFINED_OBJECT),
456  errmsg("subscription \"%s\" does not exist",
457  stmt->subname)));
458  else
459  ereport(NOTICE,
460  (errmsg("subscription \"%s\" does not exist, skipping",
461  stmt->subname)));
462 
463  return;
464  }
465 
466  subid = HeapTupleGetOid(tup);
467 
468  /* must be owner */
469  if (!pg_subscription_ownercheck(subid, GetUserId()))
471  stmt->subname);
472 
473  /* DROP hook for the subscription being removed */
475 
476  /*
477  * Lock the subscription so nobody else can do anything with it
478  * (including the replication workers).
479  */
481 
482  /* Get subname */
483  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
485  Assert(!isnull);
486  subname = pstrdup(NameStr(*DatumGetName(datum)));
487 
488  /* Get conninfo */
489  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
491  Assert(!isnull);
492  conninfo = pstrdup(TextDatumGetCString(datum));
493 
494  /* Get slotname */
495  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
497  Assert(!isnull);
498  slotname = pstrdup(NameStr(*DatumGetName(datum)));
499 
501  EventTriggerSQLDropAddObject(&myself, true, true);
502 
503  /* Remove the tuple from catalog. */
504  CatalogTupleDelete(rel, &tup->t_self);
505 
506  ReleaseSysCache(tup);
507 
508  /* Clean up dependencies */
510 
511  /* Protect against launcher restarting the worker. */
512  LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
513 
514  /* Kill the apply worker so that the slot becomes accessible. */
515  logicalrep_worker_stop(subid);
516 
517  LWLockRelease(LogicalRepLauncherLock);
518 
519  /* Remove the origin tracking if exists. */
520  snprintf(originname, sizeof(originname), "pg_%u", subid);
521  originid = replorigin_by_name(originname, true);
522  if (originid != InvalidRepOriginId)
523  replorigin_drop(originid);
524 
525  /* If the user asked to not drop the slot, we are done mow.*/
526  if (!stmt->drop_slot)
527  {
528  heap_close(rel, NoLock);
529  return;
530  }
531 
532  /*
533  * Otherwise drop the replication slot at the publisher node using
534  * the replication connection.
535  */
536  load_file("libpqwalreceiver", false);
537 
538  initStringInfo(&cmd);
539  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname);
540 
541  wrconn = walrcv_connect(conninfo, true, subname, &err);
542  if (wrconn == NULL)
543  ereport(ERROR,
544  (errmsg("could not connect to publisher when attempting to "
545  "drop the replication slot \"%s\"", slotname),
546  errdetail("The error was: %s", err)));
547 
548  if (!walrcv_command(wrconn, cmd.data, &err))
549  {
550  /* Close the connection in case of failure */
551  walrcv_disconnect(wrconn);
552  ereport(ERROR,
553  (errmsg("could not drop the replication slot \"%s\" on publisher",
554  slotname),
555  errdetail("The error was: %s", err)));
556  }
557  else
558  ereport(NOTICE,
559  (errmsg("dropped replication slot \"%s\" on publisher",
560  slotname)));
561 
562  walrcv_disconnect(wrconn);
563 
564  pfree(cmd.data);
565 
566  heap_close(rel, NoLock);
567 }
#define walrcv_command(conn, cmd, err)
Definition: walreceiver.h:229
WalReceiverConn * wrconn
Definition: worker.c:106
Oid GetUserId(void)
Definition: miscinit.c:283
char * pstrdup(const char *in)
Definition: mcxt.c:1165
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:154
uint16 RepOriginId
Definition: xlogdefs.h:51
int errcode(int sqlerrcode)
Definition: elog.c:575
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
#define heap_close(r, l)
Definition: heapam.h:97
unsigned int Oid
Definition: postgres_ext.h:31
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1714
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:593
void pfree(void *pointer)
Definition: mcxt.c:992
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
#define ERROR
Definition: elog.h:43
#define Anum_pg_subscription_subname
ItemPointerData t_self
Definition: htup.h:65
#define SubscriptionRelationId
#define NoLock
Definition: lockdefs.h:34
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3378
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define CStringGetDatum(X)
Definition: postgres.h:586
#define ereport(elevel, rest)
Definition: elog.h:122
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:822
#define Anum_pg_subscription_subslotname
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
void replorigin_drop(RepOriginId roident)
Definition: origin.c:327
#define TextDatumGetCString(d)
Definition: builtins.h:91
uintptr_t Datum
Definition: postgres.h:374
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1083
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1245
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:226
#define Assert(condition)
Definition: c.h:671
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1110
#define walrcv_disconnect(conn)
Definition: walreceiver.h:231
#define InvalidRepOriginId
Definition: origin.h:34
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define AccessExclusiveLock
Definition: lockdefs.h:46
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define NameStr(name)
Definition: c.h:495
void logicalrep_worker_stop(Oid subid)
Definition: launcher.c:313
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5085
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:209
#define SearchSysCache2(cacheId, key1, key2)
Definition: syscache.h:151