PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscription (ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 
char defGetStreamingMode (DefElem *def)
 
void CheckSubDeadTupleRetention (bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( ParseState pstate,
AlterSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 1333 of file subscriptioncmds.c.

1335{
1336 Relation rel;
1338 bool nulls[Natts_pg_subscription];
1341 HeapTuple tup;
1342 Oid subid;
1343 bool update_tuple = false;
1344 bool update_failover = false;
1345 bool update_two_phase = false;
1346 bool check_pub_rdt = false;
1347 bool retain_dead_tuples;
1348 int max_retention;
1349 bool retention_active;
1350 char *origin;
1351 Subscription *sub;
1354 SubOpts opts = {0};
1355
1357
1358 /* Fetch the existing tuple. */
1360 CStringGetDatum(stmt->subname));
1361
1362 if (!HeapTupleIsValid(tup))
1363 ereport(ERROR,
1365 errmsg("subscription \"%s\" does not exist",
1366 stmt->subname)));
1367
1369 subid = form->oid;
1370
1371 /* must be owner */
1374 stmt->subname);
1375
1376 sub = GetSubscription(subid, false);
1377
1379 origin = sub->origin;
1382
1383 /*
1384 * Don't allow non-superuser modification of a subscription with
1385 * password_required=false.
1386 */
1387 if (!sub->passwordrequired && !superuser())
1388 ereport(ERROR,
1390 errmsg("password_required=false is superuser-only"),
1391 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1392
1393 /* Lock the subscription so nobody else can do anything with it. */
1395
1396 /* Form a new tuple. */
1397 memset(values, 0, sizeof(values));
1398 memset(nulls, false, sizeof(nulls));
1399 memset(replaces, false, sizeof(replaces));
1400
1401 switch (stmt->kind)
1402 {
1404 {
1414
1415 parse_subscription_options(pstate, stmt->options,
1417
1418 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1419 {
1420 /*
1421 * The subscription must be disabled to allow slot_name as
1422 * 'none', otherwise, the apply worker will repeatedly try
1423 * to stream the data using that slot_name which neither
1424 * exists on the publisher nor the user will be allowed to
1425 * create it.
1426 */
1427 if (sub->enabled && !opts.slot_name)
1428 ereport(ERROR,
1430 errmsg("cannot set %s for enabled subscription",
1431 "slot_name = NONE")));
1432
1433 if (opts.slot_name)
1436 else
1437 nulls[Anum_pg_subscription_subslotname - 1] = true;
1439 }
1440
1441 if (opts.synchronous_commit)
1442 {
1444 CStringGetTextDatum(opts.synchronous_commit);
1446 }
1447
1448 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1449 {
1451 BoolGetDatum(opts.binary);
1453 }
1454
1455 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1456 {
1458 CharGetDatum(opts.streaming);
1460 }
1461
1462 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1463 {
1465 = BoolGetDatum(opts.disableonerr);
1467 = true;
1468 }
1469
1470 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1471 {
1472 /* Non-superuser may not disable password_required. */
1473 if (!opts.passwordrequired && !superuser())
1474 ereport(ERROR,
1476 errmsg("password_required=false is superuser-only"),
1477 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1478
1480 = BoolGetDatum(opts.passwordrequired);
1482 = true;
1483 }
1484
1485 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1486 {
1488 BoolGetDatum(opts.runasowner);
1490 }
1491
1492 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1493 {
1494 /*
1495 * We need to update both the slot and the subscription
1496 * for the two_phase option. We can enable the two_phase
1497 * option for a slot only once the initial data
1498 * synchronization is done. This is to avoid missing some
1499 * data as explained in comments atop worker.c.
1500 */
1501 update_two_phase = !opts.twophase;
1502
1503 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1504 isTopLevel);
1505
1506 /*
1507 * Modifying the two_phase slot option requires a slot
1508 * lookup by slot name, so changing the slot name at the
1509 * same time is not allowed.
1510 */
1511 if (update_two_phase &&
1512 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1513 ereport(ERROR,
1515 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1516
1517 /*
1518 * Note that workers may still survive even if the
1519 * subscription has been disabled.
1520 *
1521 * Ensure workers have already been exited to avoid
1522 * getting prepared transactions while we are disabling
1523 * the two_phase option. Otherwise, the changes of an
1524 * already prepared transaction can be replicated again
1525 * along with its corresponding commit, leading to
1526 * duplicate data or errors.
1527 */
1528 if (logicalrep_workers_find(subid, true, true))
1529 ereport(ERROR,
1531 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1532 errhint("Try again after some time.")));
1533
1534 /*
1535 * two_phase cannot be disabled if there are any
1536 * uncommitted prepared transactions present otherwise it
1537 * can lead to duplicate data or errors as explained in
1538 * the comment above.
1539 */
1540 if (update_two_phase &&
1542 LookupGXactBySubid(subid))
1543 ereport(ERROR,
1545 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1546 errhint("Resolve these transactions and try again.")));
1547
1548 /* Change system catalog accordingly */
1550 CharGetDatum(opts.twophase ?
1554 }
1555
1556 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1557 {
1558 /*
1559 * Similar to the two_phase case above, we need to update
1560 * the failover option for both the slot and the
1561 * subscription.
1562 */
1563 update_failover = true;
1564
1565 CheckAlterSubOption(sub, "failover", update_failover,
1566 isTopLevel);
1567
1569 BoolGetDatum(opts.failover);
1571 }
1572
1573 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1574 {
1576 BoolGetDatum(opts.retaindeadtuples);
1578
1579 /*
1580 * Update the retention status only if there's a change in
1581 * the retain_dead_tuples option value.
1582 *
1583 * Automatically marking retention as active when
1584 * retain_dead_tuples is enabled may not always be ideal,
1585 * especially if retention was previously stopped and the
1586 * user toggles retain_dead_tuples without adjusting the
1587 * publisher workload. However, this behavior provides a
1588 * convenient way for users to manually refresh the
1589 * retention status. Since retention will be stopped again
1590 * unless the publisher workload is reduced, this approach
1591 * is acceptable for now.
1592 */
1593 if (opts.retaindeadtuples != sub->retaindeadtuples)
1594 {
1596 BoolGetDatum(opts.retaindeadtuples);
1598
1599 retention_active = opts.retaindeadtuples;
1600 }
1601
1602 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1603
1604 /*
1605 * Workers may continue running even after the
1606 * subscription has been disabled.
1607 *
1608 * To prevent race conditions (as described in
1609 * CheckAlterSubOption()), ensure that all worker
1610 * processes have already exited before proceeding.
1611 */
1612 if (logicalrep_workers_find(subid, true, true))
1613 ereport(ERROR,
1615 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1616 errhint("Try again after some time.")));
1617
1618 /*
1619 * Notify the launcher to manage the replication slot for
1620 * conflict detection. This ensures that replication slot
1621 * is efficiently handled (created, updated, or dropped)
1622 * in response to any configuration changes.
1623 */
1625
1626 check_pub_rdt = opts.retaindeadtuples;
1627 retain_dead_tuples = opts.retaindeadtuples;
1628 }
1629
1630 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1631 {
1633 Int32GetDatum(opts.maxretention);
1635
1636 max_retention = opts.maxretention;
1637 }
1638
1639 /*
1640 * Ensure that system configuration parameters are set
1641 * appropriately to support retain_dead_tuples and
1642 * max_retention_duration.
1643 */
1644 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1645 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1649 (max_retention > 0));
1650
1651 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1652 {
1654 CStringGetTextDatum(opts.origin);
1656
1657 /*
1658 * Check if changes from different origins may be received
1659 * from the publisher when the origin is changed to ANY
1660 * and retain_dead_tuples is enabled.
1661 */
1664
1665 origin = opts.origin;
1666 }
1667
1668 update_tuple = true;
1669 break;
1670 }
1671
1673 {
1674 parse_subscription_options(pstate, stmt->options,
1676 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1677
1678 if (!sub->slotname && opts.enabled)
1679 ereport(ERROR,
1681 errmsg("cannot enable subscription that does not have a slot name")));
1682
1683 /*
1684 * Check track_commit_timestamp only when enabling the
1685 * subscription in case it was disabled after creation. See
1686 * comments atop CheckSubDeadTupleRetention() for details.
1687 */
1688 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1690 sub->retentionactive, false);
1691
1693 BoolGetDatum(opts.enabled);
1695
1696 if (opts.enabled)
1698
1699 update_tuple = true;
1700
1701 /*
1702 * The subscription might be initially created with
1703 * connect=false and retain_dead_tuples=true, meaning the
1704 * remote server's status may not be checked. Ensure this
1705 * check is conducted now.
1706 */
1707 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1708 break;
1709 }
1710
1712 /* Load the library providing us libpq calls. */
1713 load_file("libpqwalreceiver", false);
1714 /* Check the connection info string. */
1715 walrcv_check_conninfo(stmt->conninfo,
1716 sub->passwordrequired && !sub->ownersuperuser);
1717
1719 CStringGetTextDatum(stmt->conninfo);
1721 update_tuple = true;
1722
1723 /*
1724 * Since the remote server configuration might have changed,
1725 * perform a check to ensure it permits enabling
1726 * retain_dead_tuples.
1727 */
1729 break;
1730
1732 {
1734 parse_subscription_options(pstate, stmt->options,
1736
1738 publicationListToArray(stmt->publication);
1740
1741 update_tuple = true;
1742
1743 /* Refresh if user asked us to. */
1744 if (opts.refresh)
1745 {
1746 if (!sub->enabled)
1747 ereport(ERROR,
1749 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1750 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1751
1752 /*
1753 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1754 * why this is not allowed.
1755 */
1756 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1757 ereport(ERROR,
1759 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1760 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1761
1762 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1763
1764 /* Make sure refresh sees the new list of publications. */
1765 sub->publications = stmt->publication;
1766
1767 AlterSubscription_refresh(sub, opts.copy_data,
1768 stmt->publication);
1769 }
1770
1771 break;
1772 }
1773
1776 {
1777 List *publist;
1779
1781 parse_subscription_options(pstate, stmt->options,
1783
1784 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1788
1789 update_tuple = true;
1790
1791 /* Refresh if user asked us to. */
1792 if (opts.refresh)
1793 {
1794 /* We only need to validate user specified publications. */
1795 List *validate_publications = (isadd) ? stmt->publication : NULL;
1796
1797 if (!sub->enabled)
1798 ereport(ERROR,
1800 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1801 /* translator: %s is an SQL ALTER command */
1802 errhint("Use %s instead.",
1803 isadd ?
1804 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1805 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1806
1807 /*
1808 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1809 * why this is not allowed.
1810 */
1811 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1812 ereport(ERROR,
1814 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1815 /* translator: %s is an SQL ALTER command */
1816 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1817 isadd ?
1818 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1819 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1820
1821 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1822
1823 /* Refresh the new list of publications. */
1824 sub->publications = publist;
1825
1826 AlterSubscription_refresh(sub, opts.copy_data,
1828 }
1829
1830 break;
1831 }
1832
1834 {
1835 if (!sub->enabled)
1836 ereport(ERROR,
1838 errmsg("%s is not allowed for disabled subscriptions",
1839 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
1840
1841 parse_subscription_options(pstate, stmt->options,
1843
1844 /*
1845 * The subscription option "two_phase" requires that
1846 * replication has passed the initial table synchronization
1847 * phase before the two_phase becomes properly enabled.
1848 *
1849 * But, having reached this two-phase commit "enabled" state
1850 * we must not allow any subsequent table initialization to
1851 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
1852 * disallowed when the user had requested two_phase = on mode.
1853 *
1854 * The exception to this restriction is when copy_data =
1855 * false, because when copy_data is false the tablesync will
1856 * start already in READY state and will exit directly without
1857 * doing anything.
1858 *
1859 * For more details see comments atop worker.c.
1860 */
1861 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1862 ereport(ERROR,
1864 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
1865 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1866
1867 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
1868
1869 AlterSubscription_refresh(sub, opts.copy_data, NULL);
1870
1871 break;
1872 }
1873
1875 {
1876 if (!sub->enabled)
1877 ereport(ERROR,
1879 errmsg("%s is not allowed for disabled subscriptions",
1880 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
1881
1883
1884 break;
1885 }
1886
1888 {
1889 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1890
1891 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1892 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1893
1894 /*
1895 * If the user sets subskiplsn, we do a sanity check to make
1896 * sure that the specified LSN is a probable value.
1897 */
1898 if (XLogRecPtrIsValid(opts.lsn))
1899 {
1901 char originname[NAMEDATALEN];
1902 XLogRecPtr remote_lsn;
1903
1905 originname, sizeof(originname));
1907 remote_lsn = replorigin_get_progress(originid, false);
1908
1909 /* Check the given LSN is at least a future LSN */
1910 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
1911 ereport(ERROR,
1913 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1914 LSN_FORMAT_ARGS(opts.lsn),
1915 LSN_FORMAT_ARGS(remote_lsn))));
1916 }
1917
1920
1921 update_tuple = true;
1922 break;
1923 }
1924
1925 default:
1926 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1927 stmt->kind);
1928 }
1929
1930 /* Update the catalog if needed. */
1931 if (update_tuple)
1932 {
1934 replaces);
1935
1936 CatalogTupleUpdate(rel, &tup->t_self, tup);
1937
1939 }
1940
1941 /*
1942 * Try to acquire the connection necessary either for modifying the slot
1943 * or for checking if the remote server permits enabling
1944 * retain_dead_tuples.
1945 *
1946 * This has to be at the end because otherwise if there is an error while
1947 * doing the database operations we won't be able to rollback altered
1948 * slot.
1949 */
1951 {
1952 bool must_use_password;
1953 char *err;
1955
1956 /* Load the library providing us libpq calls. */
1957 load_file("libpqwalreceiver", false);
1958
1959 /*
1960 * Try to connect to the publisher, using the new connection string if
1961 * available.
1962 */
1964 wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
1966 &err);
1967 if (!wrconn)
1968 ereport(ERROR,
1970 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1971 sub->name, err)));
1972
1973 PG_TRY();
1974 {
1977
1979 retain_dead_tuples, origin, NULL, 0,
1980 sub->name);
1981
1984 update_failover ? &opts.failover : NULL,
1985 update_two_phase ? &opts.twophase : NULL);
1986 }
1987 PG_FINALLY();
1988 {
1990 }
1991 PG_END_TRY();
1992 }
1993
1995
1997
1999
2000 /* Wake up related replication workers to handle this change quickly. */
2002
2003 return myself;
2004}
@ ACLCHECK_NOT_OWNER
Definition acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2654
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4090
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6259
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:641
static Datum values[MAXATTR]
Definition bootstrap.c:155
#define CStringGetTextDatum(s)
Definition builtins.h:97
#define Assert(condition)
Definition c.h:873
uint32 bits32
Definition c.h:555
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errhint(const char *fmt,...)
Definition elog.c:1330
int errcode(int sqlerrcode)
Definition elog.c:863
int errmsg(const char *fmt,...)
Definition elog.c:1080
#define PG_TRY(...)
Definition elog.h:372
#define WARNING
Definition elog.h:36
#define PG_END_TRY(...)
Definition elog.h:397
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define NOTICE
Definition elog.h:35
#define PG_FINALLY(...)
Definition elog.h:389
#define ereport(elevel,...)
Definition elog.h:150
void err(int eval, const char *fmt,...)
Definition err.c:43
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:684
Oid MyDatabaseId
Definition globals.c:94
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1210
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
return true
Definition isn.c:130
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:293
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1184
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
Oid GetUserId(void)
Definition miscinit.c:469
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:229
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
Definition origin.c:1045
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
@ OBJECT_SUBSCRIPTION
static AmcheckOptions opts
Definition pg_amcheck.c:112
#define NAMEDATALEN
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok)
FormData_pg_subscription * Form_pg_subscription
int pg_strcasecmp(const char *s1, const char *s2)
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition postgres.h:380
static Datum Int32GetDatum(int32 X)
Definition postgres.h:222
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:540
Definition pg_list.h:54
#define SUBOPT_STREAMING
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
#define SUBOPT_DISABLE_ON_ERR
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
bool superuser(void)
Definition superuser.c:46
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2797
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:94
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3669
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 RepOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH_SEQUENCES, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SKIP, AlterSubscription_refresh(), AlterSubscription_refresh_seq(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleUpdate(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications_origin_tables(), CheckAlterSubOption(), CheckSubDeadTupleRetention(), Subscription::conninfo, CStringGetDatum(), CStringGetTextDatum, DirectFunctionCall1, elog, Subscription::enabled, ereport, err(), errcode(), errhint(), errmsg(), ERROR, fb(), GETSTRUCT(), GetSubscription(), GetUserId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, Int32GetDatum(), InvalidOid, InvokeObjectPostAlterHook, IsSet, load_file(), LockSharedObject(), logicalrep_workers_find(), LogicalRepWorkersWakeupAtCommit(), LookupGXactBySubid(), LSN_FORMAT_ARGS, LSNGetDatum(), Subscription::maxretention, merge_publications(), MyDatabaseId, Subscription::name, NAMEDATALEN, namein(), NOTICE, object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), opts, Subscription::origin, Subscription::ownersuperuser, parse_subscription_options(), Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, pg_strcasecmp(), PG_TRY, PreventInTransactionBlock(), publicationListToArray(), Subscription::publications, RelationGetDescr, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_get_progress(), Subscription::retaindeadtuples, Subscription::retentionactive, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, stmt, SUBOPT_BINARY, SUBOPT_COPY_DATA, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, superuser(), table_close(), table_open(), Subscription::twophasestate, values, walrcv_alter_slot, walrcv_check_conninfo, walrcv_connect, walrcv_disconnect, WARNING, wrconn, and XLogRecPtrIsValid.

Referenced by ProcessUtilitySlow().

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char name,
Oid  newOwnerId 
)
extern

Definition at line 2414 of file subscriptioncmds.c.

2415{
2416 Oid subid;
2417 HeapTuple tup;
2418 Relation rel;
2419 ObjectAddress address;
2421
2423
2426
2427 if (!HeapTupleIsValid(tup))
2428 ereport(ERROR,
2430 errmsg("subscription \"%s\" does not exist", name)));
2431
2433 subid = form->oid;
2434
2436
2438
2440
2442
2443 return address;
2444}
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)

References AlterSubscriptionOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg(), ERROR, fb(), GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, MyDatabaseId, name, ObjectAddressSet, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy2, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)
extern

Definition at line 2450 of file subscriptioncmds.c.

2451{
2452 HeapTuple tup;
2453 Relation rel;
2454
2456
2458
2459 if (!HeapTupleIsValid(tup))
2460 ereport(ERROR,
2462 errmsg("subscription with OID %u does not exist", subid)));
2463
2465
2467
2469}
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg(), ERROR, fb(), heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned_Owner().

◆ CheckSubDeadTupleRetention()

void CheckSubDeadTupleRetention ( bool  check_guc,
bool  sub_disabled,
int  elevel_for_sub_disabled,
bool  retain_dead_tuples,
bool  retention_active,
bool  max_retention_set 
)
extern

Definition at line 2811 of file subscriptioncmds.c.

2815{
2818
2820 {
2822 ereport(ERROR,
2824 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2825 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2826
2830 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2831 errhint("Consider setting \"%s\" to true.",
2832 "track_commit_timestamp"));
2833
2837 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2839 ? errhint("Consider setting %s to false.",
2840 "retain_dead_tuples") : 0);
2841 }
2842 else if (max_retention_set)
2843 {
2846 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2847 }
2848}
bool track_commit_timestamp
Definition commit_ts.c:109
int wal_level
Definition xlog.c:134
@ WAL_LEVEL_REPLICA
Definition xlog.h:76

References Assert, ereport, errcode(), errhint(), errmsg(), ERROR, fb(), NOTICE, track_commit_timestamp, wal_level, WAL_LEVEL_REPLICA, and WARNING.

Referenced by AlterSubscription(), CreateSubscription(), and DisableSubscriptionAndExit().

◆ CreateSubscription()

ObjectAddress CreateSubscription ( ParseState pstate,
CreateSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 586 of file subscriptioncmds.c.

588{
589 Relation rel;
591 Oid subid;
592 bool nulls[Natts_pg_subscription];
594 Oid owner = GetUserId();
596 char *conninfo;
598 List *publications;
600 SubOpts opts = {0};
602
603 /*
604 * Parse and check options.
605 *
606 * Connection and publication should not be specified here.
607 */
617
618 /*
619 * Since creating a replication slot is not transactional, rolling back
620 * the transaction leaves the created replication slot. So we cannot run
621 * CREATE SUBSCRIPTION inside a transaction block if creating a
622 * replication slot.
623 */
624 if (opts.create_slot)
625 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
626
627 /*
628 * We don't want to allow unprivileged users to be able to trigger
629 * attempts to access arbitrary network destinations, so require the user
630 * to have been specifically authorized to create subscriptions.
631 */
635 errmsg("permission denied to create subscription"),
636 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
637 "pg_create_subscription")));
638
639 /*
640 * Since a subscription is a database object, we also check for CREATE
641 * permission on the database.
642 */
644 owner, ACL_CREATE);
645 if (aclresult != ACLCHECK_OK)
648
649 /*
650 * Non-superusers are required to set a password for authentication, and
651 * that password must be used by the target server, but the superuser can
652 * exempt a subscription from this requirement.
653 */
654 if (!opts.passwordrequired && !superuser_arg(owner))
657 errmsg("password_required=false is superuser-only"),
658 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
659
660 /*
661 * If built with appropriate switch, whine when regression-testing
662 * conventions for subscription names are violated.
663 */
664#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
665 if (strncmp(stmt->subname, "regress_", 8) != 0)
666 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
667#endif
668
670
671 /* Check if name is used */
674 if (OidIsValid(subid))
675 {
678 errmsg("subscription \"%s\" already exists",
679 stmt->subname)));
680 }
681
682 /*
683 * Ensure that system configuration parameters are set appropriately to
684 * support retain_dead_tuples and max_retention_duration.
685 */
687 opts.retaindeadtuples, opts.retaindeadtuples,
688 (opts.maxretention > 0));
689
690 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
691 opts.slot_name == NULL)
692 opts.slot_name = stmt->subname;
693
694 /* The default for synchronous_commit of subscriptions is off. */
695 if (opts.synchronous_commit == NULL)
696 opts.synchronous_commit = "off";
697
698 conninfo = stmt->conninfo;
699 publications = stmt->publication;
700
701 /* Load the library providing us libpq calls. */
702 load_file("libpqwalreceiver", false);
703
704 /* Check the connection info string. */
705 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
706
707 /* Everything ok, form a new tuple. */
708 memset(values, 0, sizeof(values));
709 memset(nulls, false, sizeof(nulls));
710
723 CharGetDatum(opts.twophase ?
731 BoolGetDatum(opts.retaindeadtuples);
733 Int32GetDatum(opts.maxretention);
735 Int32GetDatum(opts.retaindeadtuples);
737 CStringGetTextDatum(conninfo);
738 if (opts.slot_name)
741 else
742 nulls[Anum_pg_subscription_subslotname - 1] = true;
744 CStringGetTextDatum(opts.synchronous_commit);
746 publicationListToArray(publications);
749
751
752 /* Insert tuple into catalog. */
755
757
758 /*
759 * A replication origin is currently created for all subscriptions,
760 * including those that only contain sequences or are otherwise empty.
761 *
762 * XXX: While this is technically unnecessary, optimizing it would require
763 * additional logic to skip origin creation during DDL operations and
764 * apply workers initialization, and to handle origin creation dynamically
765 * when tables are added to the subscription. It is not clear whether
766 * preventing creation of origins is worth additional complexity.
767 */
770
771 /*
772 * Connect to remote side to execute requested commands and fetch table
773 * and sequence info.
774 */
775 if (opts.connect)
776 {
777 char *err;
780
781 /* Try to connect to the publisher. */
782 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
783 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
784 stmt->subname, &err);
785 if (!wrconn)
788 errmsg("subscription \"%s\" could not connect to the publisher: %s",
789 stmt->subname, err)));
790
791 PG_TRY();
792 {
793 bool has_tables = false;
794 List *pubrels;
795 char relation_state;
796
797 check_publications(wrconn, publications);
799 opts.copy_data,
800 opts.retaindeadtuples, opts.origin,
801 NULL, 0, stmt->subname);
803 opts.copy_data, opts.origin,
804 NULL, 0, stmt->subname);
805
806 if (opts.retaindeadtuples)
808
809 /*
810 * Set sync state based on if we were asked to do data copy or
811 * not.
812 */
814
815 /*
816 * Build local relation status info. Relations are for both tables
817 * and sequences from the publisher.
818 */
819 pubrels = fetch_relation_list(wrconn, publications);
820
822 {
823 Oid relid;
824 char relkind;
825 RangeVar *rv = pubrelinfo->rv;
826
827 relid = RangeVarGetRelid(rv, AccessShareLock, false);
828 relkind = get_rel_relkind(relid);
829
830 /* Check for supported relkind. */
831 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
832 rv->schemaname, rv->relname);
833 has_tables |= (relkind != RELKIND_SEQUENCE);
835 InvalidXLogRecPtr, true);
836 }
837
838 /*
839 * If requested, create permanent slot for the subscription. We
840 * won't use the initial snapshot for anything, so no need to
841 * export it.
842 *
843 * XXX: Similar to origins, it is not clear whether preventing the
844 * slot creation for empty and sequence-only subscriptions is
845 * worth additional complexity.
846 */
847 if (opts.create_slot)
848 {
849 bool twophase_enabled = false;
850
851 Assert(opts.slot_name);
852
853 /*
854 * Even if two_phase is set, don't create the slot with
855 * two-phase enabled. Will enable it once all the tables are
856 * synced and ready. This avoids race-conditions like prepared
857 * transactions being skipped due to changes not being applied
858 * due to checks in should_apply_changes_for_rel() when
859 * tablesync for the corresponding tables are in progress. See
860 * comments atop worker.c.
861 *
862 * Note that if tables were specified but copy_data is false
863 * then it is safe to enable two_phase up-front because those
864 * tables are already initially in READY state. When the
865 * subscription has no tables, we leave the twophase state as
866 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
867 * PUBLICATION to work.
868 */
869 if (opts.twophase && !opts.copy_data && has_tables)
870 twophase_enabled = true;
871
873 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
874
877
879 (errmsg("created replication slot \"%s\" on publisher",
880 opts.slot_name)));
881 }
882 }
883 PG_FINALLY();
884 {
886 }
887 PG_END_TRY();
888 }
889 else
891 (errmsg("subscription was created, but is not connected"),
892 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
893
895
897
898 /*
899 * Notify the launcher to start the apply worker if the subscription is
900 * enabled, or to create the conflict detection slot if retain_dead_tuples
901 * is enabled.
902 *
903 * Creating the conflict detection slot is essential even when the
904 * subscription is not enabled. This ensures that dead tuples are
905 * retained, which is necessary for accurately identifying the type of
906 * conflict during replication.
907 */
908 if (opts.enabled || opts.retaindeadtuples)
910
912
914
915 return myself;
916}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5284
AclResult
Definition acl.h:182
@ ACLCHECK_OK
Definition acl.h:183
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3836
#define OidIsValid(objectId)
Definition c.h:788
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
int errdetail(const char *fmt,...)
Definition elog.c:1216
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define AccessShareLock
Definition lockdefs.h:36
char * get_database_name(Oid dbid)
Definition lsyscache.c:1242
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2153
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
#define InvokeObjectPostCreateHook(classId, objectId, subId)
RepOriginId replorigin_create(const char *roname)
Definition origin.c:260
@ OBJECT_DATABASE
#define ACL_CREATE
Definition parsenodes.h:85
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
void pgstat_create_subscription(Oid subid)
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
char * relname
Definition primnodes.h:83
char * schemaname
Definition primnodes.h:80
#define SUBOPT_CREATE_SLOT
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_CONNECT
bool superuser_arg(Oid roleid)
Definition superuser.c:56
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1649
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References AccessShareLock, ACL_CREATE, aclcheck_error(), ACLCHECK_OK, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleInsert(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications(), check_publications_origin_sequences(), check_publications_origin_tables(), CheckSubDeadTupleRetention(), CheckSubscriptionRelkind(), CRS_NOEXPORT_SNAPSHOT, CStringGetDatum(), CStringGetTextDatum, DirectFunctionCall1, elog, ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errdetail(), errhint(), errmsg(), ERROR, fb(), fetch_relation_list(), foreach_ptr, get_database_name(), get_rel_relkind(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), has_privs_of_role(), heap_form_tuple(), heap_freetuple(), Int32GetDatum(), InvalidOid, InvalidXLogRecPtr, InvokeObjectPostCreateHook, IsSet, load_file(), LSNGetDatum(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), OBJECT_DATABASE, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, opts, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_create_subscription(), PreventInTransactionBlock(), publicationListToArray(), RangeVarGetRelid, recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, ReplicationOriginNameForLogicalRep(), replorigin_create(), RowExclusiveLock, RangeVar::schemaname, stmt, SUBOPT_BINARY, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, superuser(), superuser_arg(), table_close(), table_open(), UpdateTwoPhaseState(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

◆ defGetStreamingMode()

char defGetStreamingMode ( DefElem def)
extern

Definition at line 3142 of file subscriptioncmds.c.

3143{
3144 /*
3145 * If no parameter value given, assume "true" is meant.
3146 */
3147 if (!def->arg)
3148 return LOGICALREP_STREAM_ON;
3149
3150 /*
3151 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3152 */
3153 switch (nodeTag(def->arg))
3154 {
3155 case T_Integer:
3156 switch (intVal(def->arg))
3157 {
3158 case 0:
3159 return LOGICALREP_STREAM_OFF;
3160 case 1:
3161 return LOGICALREP_STREAM_ON;
3162 default:
3163 /* otherwise, error out below */
3164 break;
3165 }
3166 break;
3167 default:
3168 {
3169 char *sval = defGetString(def);
3170
3171 /*
3172 * The set of strings accepted here should match up with the
3173 * grammar's opt_boolean_or_string production.
3174 */
3175 if (pg_strcasecmp(sval, "false") == 0 ||
3176 pg_strcasecmp(sval, "off") == 0)
3177 return LOGICALREP_STREAM_OFF;
3178 if (pg_strcasecmp(sval, "true") == 0 ||
3179 pg_strcasecmp(sval, "on") == 0)
3180 return LOGICALREP_STREAM_ON;
3181 if (pg_strcasecmp(sval, "parallel") == 0)
3183 }
3184 break;
3185 }
3186
3187 ereport(ERROR,
3189 errmsg("%s requires a Boolean value or \"parallel\"",
3190 def->defname)));
3191 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3192}
char * defGetString(DefElem *def)
Definition define.c:34
#define nodeTag(nodeptr)
Definition nodes.h:139
char * defname
Definition parsenodes.h:844
Node * arg
Definition parsenodes.h:845
#define intVal(v)
Definition value.h:79

References DefElem::arg, defGetString(), DefElem::defname, ereport, errcode(), errmsg(), ERROR, fb(), intVal, nodeTag, and pg_strcasecmp().

Referenced by parse_output_parameters(), and parse_subscription_options().

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 2010 of file subscriptioncmds.c.

2011{
2012 Relation rel;
2014 HeapTuple tup;
2015 Oid subid;
2016 Oid subowner;
2017 Datum datum;
2018 bool isnull;
2019 char *subname;
2020 char *conninfo;
2021 char *slotname;
2023 ListCell *lc;
2024 char originname[NAMEDATALEN];
2025 char *err = NULL;
2028 List *rstates;
2029 bool must_use_password;
2030
2031 /*
2032 * The launcher may concurrently start a new worker for this subscription.
2033 * During initialization, the worker checks for subscription validity and
2034 * exits if the subscription has already been dropped. See
2035 * InitializeLogRepWorker.
2036 */
2038
2040 CStringGetDatum(stmt->subname));
2041
2042 if (!HeapTupleIsValid(tup))
2043 {
2044 table_close(rel, NoLock);
2045
2046 if (!stmt->missing_ok)
2047 ereport(ERROR,
2049 errmsg("subscription \"%s\" does not exist",
2050 stmt->subname)));
2051 else
2053 (errmsg("subscription \"%s\" does not exist, skipping",
2054 stmt->subname)));
2055
2056 return;
2057 }
2058
2060 subid = form->oid;
2061 subowner = form->subowner;
2062 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2063
2064 /* must be owner */
2067 stmt->subname);
2068
2069 /* DROP hook for the subscription being removed */
2071
2072 /*
2073 * Lock the subscription so nobody else can do anything with it (including
2074 * the replication workers).
2075 */
2077
2078 /* Get subname */
2081 subname = pstrdup(NameStr(*DatumGetName(datum)));
2082
2083 /* Get conninfo */
2086 conninfo = TextDatumGetCString(datum);
2087
2088 /* Get slotname */
2091 if (!isnull)
2092 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2093 else
2094 slotname = NULL;
2095
2096 /*
2097 * Since dropping a replication slot is not transactional, the replication
2098 * slot stays dropped even if the transaction rolls back. So we cannot
2099 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2100 * replication slot. Also, in this case, we report a message for dropping
2101 * the subscription to the cumulative stats system.
2102 *
2103 * XXX The command name should really be something like "DROP SUBSCRIPTION
2104 * of a subscription that is associated with a replication slot", but we
2105 * don't have the proper facilities for that.
2106 */
2107 if (slotname)
2108 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2109
2112
2113 /* Remove the tuple from catalog. */
2114 CatalogTupleDelete(rel, &tup->t_self);
2115
2117
2118 /*
2119 * Stop all the subscription workers immediately.
2120 *
2121 * This is necessary if we are dropping the replication slot, so that the
2122 * slot becomes accessible.
2123 *
2124 * It is also necessary if the subscription is disabled and was disabled
2125 * in the same transaction. Then the workers haven't seen the disabling
2126 * yet and will still be running, leading to hangs later when we want to
2127 * drop the replication origin. If the subscription was disabled before
2128 * this transaction, then there shouldn't be any workers left, so this
2129 * won't make a difference.
2130 *
2131 * New workers won't be started because we hold an exclusive lock on the
2132 * subscription till the end of the transaction.
2133 */
2134 subworkers = logicalrep_workers_find(subid, false, true);
2135 foreach(lc, subworkers)
2136 {
2138
2140 }
2142
2143 /*
2144 * Remove the no-longer-useful entry in the launcher's table of apply
2145 * worker start times.
2146 *
2147 * If this transaction rolls back, the launcher might restart a failed
2148 * apply worker before wal_retrieve_retry_interval milliseconds have
2149 * elapsed, but that's pretty harmless.
2150 */
2152
2153 /*
2154 * Cleanup of tablesync replication origins.
2155 *
2156 * Any READY-state relations would already have dealt with clean-ups.
2157 *
2158 * Note that the state can't change because we have already stopped both
2159 * the apply and tablesync workers and they can't restart because of
2160 * exclusive lock on the subscription.
2161 */
2162 rstates = GetSubscriptionRelations(subid, true, false, true);
2163 foreach(lc, rstates)
2164 {
2166 Oid relid = rstate->relid;
2167
2168 /* Only cleanup resources of tablesync workers */
2169 if (!OidIsValid(relid))
2170 continue;
2171
2172 /*
2173 * Drop the tablesync's origin tracking if exists.
2174 *
2175 * It is possible that the origin is not yet created for tablesync
2176 * worker so passing missing_ok = true. This can happen for the states
2177 * before SUBREL_STATE_DATASYNC.
2178 */
2180 sizeof(originname));
2181 replorigin_drop_by_name(originname, true, false);
2182 }
2183
2184 /* Clean up dependencies */
2186
2187 /* Remove any associated relation synchronization states. */
2189
2190 /* Remove the origin tracking if exists. */
2192 replorigin_drop_by_name(originname, true, false);
2193
2194 /*
2195 * Tell the cumulative stats system that the subscription is getting
2196 * dropped.
2197 */
2199
2200 /*
2201 * If there is no slot associated with the subscription, we can finish
2202 * here.
2203 */
2204 if (!slotname && rstates == NIL)
2205 {
2206 table_close(rel, NoLock);
2207 return;
2208 }
2209
2210 /*
2211 * Try to acquire the connection necessary for dropping slots.
2212 *
2213 * Note: If the slotname is NONE/NULL then we allow the command to finish
2214 * and users need to manually cleanup the apply and tablesync worker slots
2215 * later.
2216 *
2217 * This has to be at the end because otherwise if there is an error while
2218 * doing the database operations we won't be able to rollback dropped
2219 * slot.
2220 */
2221 load_file("libpqwalreceiver", false);
2222
2223 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2224 subname, &err);
2225 if (wrconn == NULL)
2226 {
2227 if (!slotname)
2228 {
2229 /* be tidy */
2231 table_close(rel, NoLock);
2232 return;
2233 }
2234 else
2235 {
2236 ReportSlotConnectionError(rstates, subid, slotname, err);
2237 }
2238 }
2239
2240 PG_TRY();
2241 {
2242 foreach(lc, rstates)
2243 {
2245 Oid relid = rstate->relid;
2246
2247 /* Only cleanup resources of tablesync workers */
2248 if (!OidIsValid(relid))
2249 continue;
2250
2251 /*
2252 * Drop the tablesync slots associated with removed tables.
2253 *
2254 * For SYNCDONE/READY states, the tablesync slot is known to have
2255 * already been dropped by the tablesync worker.
2256 *
2257 * For other states, there is no certainty, maybe the slot does
2258 * not exist yet. Also, if we fail after removing some of the
2259 * slots, next time, it will again try to drop already dropped
2260 * slots and fail. For these reasons, we allow missing_ok = true
2261 * for the drop.
2262 */
2263 if (rstate->state != SUBREL_STATE_SYNCDONE)
2264 {
2265 char syncslotname[NAMEDATALEN] = {0};
2266
2268 sizeof(syncslotname));
2270 }
2271 }
2272
2274
2275 /*
2276 * If there is a slot associated with the subscription, then drop the
2277 * replication slot at the publisher.
2278 */
2279 if (slotname)
2280 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2281 }
2282 PG_FINALLY();
2283 {
2285 }
2286 PG_END_TRY();
2287
2288 table_close(rel, NoLock);
2289}
#define TextDatumGetCString(d)
Definition builtins.h:98
#define NameStr(name)
Definition c.h:765
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:652
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1154
void list_free(List *list)
Definition list.c:1546
#define NoLock
Definition lockdefs.h:34
char * pstrdup(const char *in)
Definition mcxt.c:1781
#define InvokeObjectDropHook(classId, objectId, subId)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:445
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
NameData subname
void pgstat_drop_subscription(Oid subid)
static Name DatumGetName(Datum X)
Definition postgres.h:390
LogicalRepWorkerType type
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:595
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition syscache.c:230
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:625
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1203

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ApplyLauncherForgetWorkerStartTime(), CatalogTupleDelete(), CStringGetDatum(), DatumGetName(), deleteSharedDependencyRecordsFor(), ereport, err(), errcode(), errmsg(), ERROR, EventTriggerSQLDropAddObject(), fb(), GETSTRUCT(), GetSubscriptionRelations(), GetUserId(), HeapTupleIsValid, InvalidOid, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), MyDatabaseId, NAMEDATALEN, NameStr, NIL, NoLock, NOTICE, object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_drop_subscription(), PreventInTransactionBlock(), pstrdup(), ReleaseSysCache(), SubscriptionRelState::relid, LogicalRepWorker::relid, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), ReportSlotConnectionError(), RowExclusiveLock, SearchSysCache2(), SubscriptionRelState::state, stmt, LogicalRepWorker::subid, subname, superuser_arg(), SysCacheGetAttr(), SysCacheGetAttrNotNull(), table_close(), table_open(), TextDatumGetCString, LogicalRepWorker::type, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().