PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscription (ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 
char defGetStreamingMode (DefElem *def)
 
void CheckSubDeadTupleRetention (bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( ParseState pstate,
AlterSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 1418 of file subscriptioncmds.c.

1420{
1421 Relation rel;
1423 bool nulls[Natts_pg_subscription];
1426 HeapTuple tup;
1427 Oid subid;
1428 bool update_tuple = false;
1429 bool update_failover = false;
1430 bool update_two_phase = false;
1431 bool check_pub_rdt = false;
1432 bool retain_dead_tuples;
1433 int max_retention;
1434 bool retention_active;
1435 char *new_conninfo = NULL;
1436 char *origin;
1437 Subscription *sub;
1440 SubOpts opts = {0};
1441
1443
1444 /* Fetch the existing tuple. */
1446 CStringGetDatum(stmt->subname));
1447
1448 if (!HeapTupleIsValid(tup))
1449 ereport(ERROR,
1451 errmsg("subscription \"%s\" does not exist",
1452 stmt->subname)));
1453
1455 subid = form->oid;
1456
1457 /* must be owner */
1460 stmt->subname);
1461
1462 /*
1463 * Skip ACL checks on the subscription's foreign server, if any. If
1464 * changing the server (or replacing it with a raw connection), then the
1465 * old one will be removed anyway. If changing something unrelated,
1466 * there's no need to do an additional ACL check here; that will be done
1467 * by the subscription worker anyway.
1468 */
1469 sub = GetSubscription(subid, false, false);
1470
1472 origin = sub->origin;
1475
1476 /*
1477 * Don't allow non-superuser modification of a subscription with
1478 * password_required=false.
1479 */
1480 if (!sub->passwordrequired && !superuser())
1481 ereport(ERROR,
1483 errmsg("password_required=false is superuser-only"),
1484 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1485
1486 /* Lock the subscription so nobody else can do anything with it. */
1488
1489 /* Form a new tuple. */
1490 memset(values, 0, sizeof(values));
1491 memset(nulls, false, sizeof(nulls));
1492 memset(replaces, false, sizeof(replaces));
1493
1495
1496 switch (stmt->kind)
1497 {
1499 {
1510
1511 parse_subscription_options(pstate, stmt->options,
1513
1514 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1515 {
1516 /*
1517 * The subscription must be disabled to allow slot_name as
1518 * 'none', otherwise, the apply worker will repeatedly try
1519 * to stream the data using that slot_name which neither
1520 * exists on the publisher nor the user will be allowed to
1521 * create it.
1522 */
1523 if (sub->enabled && !opts.slot_name)
1524 ereport(ERROR,
1526 errmsg("cannot set %s for enabled subscription",
1527 "slot_name = NONE")));
1528
1529 if (opts.slot_name)
1532 else
1533 nulls[Anum_pg_subscription_subslotname - 1] = true;
1535 }
1536
1537 if (opts.synchronous_commit)
1538 {
1540 CStringGetTextDatum(opts.synchronous_commit);
1542 }
1543
1544 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1545 {
1547 BoolGetDatum(opts.binary);
1549 }
1550
1551 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1552 {
1554 CharGetDatum(opts.streaming);
1556 }
1557
1558 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1559 {
1561 = BoolGetDatum(opts.disableonerr);
1563 = true;
1564 }
1565
1566 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1567 {
1568 /* Non-superuser may not disable password_required. */
1569 if (!opts.passwordrequired && !superuser())
1570 ereport(ERROR,
1572 errmsg("password_required=false is superuser-only"),
1573 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1574
1576 = BoolGetDatum(opts.passwordrequired);
1578 = true;
1579 }
1580
1581 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1582 {
1584 BoolGetDatum(opts.runasowner);
1586 }
1587
1588 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1589 {
1590 /*
1591 * We need to update both the slot and the subscription
1592 * for the two_phase option. We can enable the two_phase
1593 * option for a slot only once the initial data
1594 * synchronization is done. This is to avoid missing some
1595 * data as explained in comments atop worker.c.
1596 */
1597 update_two_phase = !opts.twophase;
1598
1599 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1600 isTopLevel);
1601
1602 /*
1603 * Modifying the two_phase slot option requires a slot
1604 * lookup by slot name, so changing the slot name at the
1605 * same time is not allowed.
1606 */
1607 if (update_two_phase &&
1608 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1609 ereport(ERROR,
1611 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1612
1613 /*
1614 * Note that workers may still survive even if the
1615 * subscription has been disabled.
1616 *
1617 * Ensure workers have already been exited to avoid
1618 * getting prepared transactions while we are disabling
1619 * the two_phase option. Otherwise, the changes of an
1620 * already prepared transaction can be replicated again
1621 * along with its corresponding commit, leading to
1622 * duplicate data or errors.
1623 */
1624 if (logicalrep_workers_find(subid, true, true))
1625 ereport(ERROR,
1627 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1628 errhint("Try again after some time.")));
1629
1630 /*
1631 * two_phase cannot be disabled if there are any
1632 * uncommitted prepared transactions present otherwise it
1633 * can lead to duplicate data or errors as explained in
1634 * the comment above.
1635 */
1636 if (update_two_phase &&
1638 LookupGXactBySubid(subid))
1639 ereport(ERROR,
1641 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1642 errhint("Resolve these transactions and try again.")));
1643
1644 /* Change system catalog accordingly */
1646 CharGetDatum(opts.twophase ?
1650 }
1651
1652 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1653 {
1654 /*
1655 * Similar to the two_phase case above, we need to update
1656 * the failover option for both the slot and the
1657 * subscription.
1658 */
1659 update_failover = true;
1660
1661 CheckAlterSubOption(sub, "failover", update_failover,
1662 isTopLevel);
1663
1665 BoolGetDatum(opts.failover);
1667 }
1668
1669 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1670 {
1672 BoolGetDatum(opts.retaindeadtuples);
1674
1675 /*
1676 * Update the retention status only if there's a change in
1677 * the retain_dead_tuples option value.
1678 *
1679 * Automatically marking retention as active when
1680 * retain_dead_tuples is enabled may not always be ideal,
1681 * especially if retention was previously stopped and the
1682 * user toggles retain_dead_tuples without adjusting the
1683 * publisher workload. However, this behavior provides a
1684 * convenient way for users to manually refresh the
1685 * retention status. Since retention will be stopped again
1686 * unless the publisher workload is reduced, this approach
1687 * is acceptable for now.
1688 */
1689 if (opts.retaindeadtuples != sub->retaindeadtuples)
1690 {
1692 BoolGetDatum(opts.retaindeadtuples);
1694
1695 retention_active = opts.retaindeadtuples;
1696 }
1697
1698 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1699
1700 /*
1701 * Workers may continue running even after the
1702 * subscription has been disabled.
1703 *
1704 * To prevent race conditions (as described in
1705 * CheckAlterSubOption()), ensure that all worker
1706 * processes have already exited before proceeding.
1707 */
1708 if (logicalrep_workers_find(subid, true, true))
1709 ereport(ERROR,
1711 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1712 errhint("Try again after some time.")));
1713
1714 /*
1715 * Notify the launcher to manage the replication slot for
1716 * conflict detection. This ensures that replication slot
1717 * is efficiently handled (created, updated, or dropped)
1718 * in response to any configuration changes.
1719 */
1721
1722 check_pub_rdt = opts.retaindeadtuples;
1723 retain_dead_tuples = opts.retaindeadtuples;
1724 }
1725
1726 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1727 {
1729 Int32GetDatum(opts.maxretention);
1731
1732 max_retention = opts.maxretention;
1733 }
1734
1735 /*
1736 * Ensure that system configuration parameters are set
1737 * appropriately to support retain_dead_tuples and
1738 * max_retention_duration.
1739 */
1740 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1741 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1745 (max_retention > 0));
1746
1747 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1748 {
1750 CStringGetTextDatum(opts.origin);
1752
1753 /*
1754 * Check if changes from different origins may be received
1755 * from the publisher when the origin is changed to ANY
1756 * and retain_dead_tuples is enabled. Use |= so that we
1757 * don't clear the flag already set when
1758 * retain_dead_tuples was changed in the same command.
1759 */
1762
1763 origin = opts.origin;
1764 }
1765
1766 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1767 {
1769 CStringGetTextDatum(opts.wal_receiver_timeout);
1771 }
1772
1773 update_tuple = true;
1774 break;
1775 }
1776
1778 {
1779 parse_subscription_options(pstate, stmt->options,
1781 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1782
1783 if (!sub->slotname && opts.enabled)
1784 ereport(ERROR,
1786 errmsg("cannot enable subscription that does not have a slot name")));
1787
1788 /*
1789 * Check track_commit_timestamp only when enabling the
1790 * subscription in case it was disabled after creation. See
1791 * comments atop CheckSubDeadTupleRetention() for details.
1792 */
1793 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1795 sub->retentionactive, false);
1796
1798 BoolGetDatum(opts.enabled);
1800
1801 if (opts.enabled)
1803
1804 update_tuple = true;
1805
1806 /*
1807 * The subscription might be initially created with
1808 * connect=false and retain_dead_tuples=true, meaning the
1809 * remote server's status may not be checked. Ensure this
1810 * check is conducted now.
1811 */
1812 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1813 break;
1814 }
1815
1817 {
1821
1822 /*
1823 * Remove what was there before, either another foreign server
1824 * or a connection string.
1825 */
1826 if (form->subserver)
1827 {
1830 ForeignServerRelationId, form->subserver);
1831 }
1832 else
1833 {
1834 nulls[Anum_pg_subscription_subconninfo - 1] = true;
1836 }
1837
1838 /*
1839 * Check that the subscription owner has USAGE privileges on
1840 * the server.
1841 */
1842 new_server = GetForeignServerByName(stmt->servername, false);
1844 new_server->serverid,
1845 form->subowner, ACL_USAGE);
1846 if (aclresult != ACLCHECK_OK)
1847 ereport(ERROR,
1849 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1850 GetUserNameFromId(form->subowner, false),
1851 new_server->servername));
1852
1853 /* make sure a user mapping exists */
1854 GetUserMapping(form->subowner, new_server->serverid);
1855
1857 new_server);
1858
1859 /* Load the library providing us libpq calls. */
1860 load_file("libpqwalreceiver", false);
1861 /* Check the connection info string. */
1863 sub->passwordrequired && !sub->ownersuperuser);
1864
1867
1870
1871 update_tuple = true;
1872 }
1873
1874 /*
1875 * Since the remote server configuration might have changed,
1876 * perform a check to ensure it permits enabling
1877 * retain_dead_tuples.
1878 */
1880 break;
1881
1883 /* remove reference to foreign server and dependencies, if present */
1884 if (form->subserver)
1885 {
1888 ForeignServerRelationId, form->subserver);
1889
1892 }
1893
1894 new_conninfo = stmt->conninfo;
1895
1896 /* Load the library providing us libpq calls. */
1897 load_file("libpqwalreceiver", false);
1898 /* Check the connection info string. */
1900 sub->passwordrequired && !sub->ownersuperuser);
1901
1903 CStringGetTextDatum(stmt->conninfo);
1905 update_tuple = true;
1906
1907 /*
1908 * Since the remote server configuration might have changed,
1909 * perform a check to ensure it permits enabling
1910 * retain_dead_tuples.
1911 */
1913 break;
1914
1916 {
1918 parse_subscription_options(pstate, stmt->options,
1920
1922 publicationListToArray(stmt->publication);
1924
1925 update_tuple = true;
1926
1927 /* Refresh if user asked us to. */
1928 if (opts.refresh)
1929 {
1930 if (!sub->enabled)
1931 ereport(ERROR,
1933 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1934 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1935
1936 /*
1937 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1938 * why this is not allowed.
1939 */
1940 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1941 ereport(ERROR,
1943 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1944 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1945
1946 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1947
1948 /* Make sure refresh sees the new list of publications. */
1949 sub->publications = stmt->publication;
1950
1951 AlterSubscription_refresh(sub, opts.copy_data,
1952 stmt->publication);
1953 }
1954
1955 break;
1956 }
1957
1960 {
1961 List *publist;
1963
1965 parse_subscription_options(pstate, stmt->options,
1967
1968 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1972
1973 update_tuple = true;
1974
1975 /* Refresh if user asked us to. */
1976 if (opts.refresh)
1977 {
1978 /* We only need to validate user specified publications. */
1979 List *validate_publications = (isadd) ? stmt->publication : NULL;
1980
1981 if (!sub->enabled)
1982 ereport(ERROR,
1984 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1985 /* translator: %s is an SQL ALTER command */
1986 errhint("Use %s instead.",
1987 isadd ?
1988 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1989 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1990
1991 /*
1992 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1993 * why this is not allowed.
1994 */
1995 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1996 ereport(ERROR,
1998 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1999 /* translator: %s is an SQL ALTER command */
2000 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
2001 isadd ?
2002 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
2003 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
2004
2005 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2006
2007 /* Refresh the new list of publications. */
2008 sub->publications = publist;
2009
2010 AlterSubscription_refresh(sub, opts.copy_data,
2012 }
2013
2014 break;
2015 }
2016
2018 {
2019 if (!sub->enabled)
2020 ereport(ERROR,
2022 errmsg("%s is not allowed for disabled subscriptions",
2023 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2024
2025 parse_subscription_options(pstate, stmt->options,
2027
2028 /*
2029 * The subscription option "two_phase" requires that
2030 * replication has passed the initial table synchronization
2031 * phase before the two_phase becomes properly enabled.
2032 *
2033 * But, having reached this two-phase commit "enabled" state
2034 * we must not allow any subsequent table initialization to
2035 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2036 * disallowed when the user had requested two_phase = on mode.
2037 *
2038 * The exception to this restriction is when copy_data =
2039 * false, because when copy_data is false the tablesync will
2040 * start already in READY state and will exit directly without
2041 * doing anything.
2042 *
2043 * For more details see comments atop worker.c.
2044 */
2045 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2046 ereport(ERROR,
2048 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2049 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2050
2051 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2052
2053 AlterSubscription_refresh(sub, opts.copy_data, NULL);
2054
2055 break;
2056 }
2057
2059 {
2060 if (!sub->enabled)
2061 ereport(ERROR,
2063 errmsg("%s is not allowed for disabled subscriptions",
2064 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2065
2067
2068 break;
2069 }
2070
2072 {
2073 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
2074
2075 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
2076 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2077
2078 /*
2079 * If the user sets subskiplsn, we do a sanity check to make
2080 * sure that the specified LSN is a probable value.
2081 */
2082 if (XLogRecPtrIsValid(opts.lsn))
2083 {
2085 char originname[NAMEDATALEN];
2086 XLogRecPtr remote_lsn;
2087
2089 originname, sizeof(originname));
2091 remote_lsn = replorigin_get_progress(originid, false);
2092
2093 /* Check the given LSN is at least a future LSN */
2094 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
2095 ereport(ERROR,
2097 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2098 LSN_FORMAT_ARGS(opts.lsn),
2099 LSN_FORMAT_ARGS(remote_lsn))));
2100 }
2101
2104
2105 update_tuple = true;
2106 break;
2107 }
2108
2109 default:
2110 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2111 stmt->kind);
2112 }
2113
2114 /* Update the catalog if needed. */
2115 if (update_tuple)
2116 {
2118 replaces);
2119
2120 CatalogTupleUpdate(rel, &tup->t_self, tup);
2121
2123 }
2124
2125 /*
2126 * Try to acquire the connection necessary either for modifying the slot
2127 * or for checking if the remote server permits enabling
2128 * retain_dead_tuples.
2129 *
2130 * This has to be at the end because otherwise if there is an error while
2131 * doing the database operations we won't be able to rollback altered
2132 * slot.
2133 */
2135 {
2136 bool must_use_password;
2137 char *err;
2139
2140 /* Load the library providing us libpq calls. */
2141 load_file("libpqwalreceiver", false);
2142
2143 /*
2144 * Try to connect to the publisher, using the new connection string if
2145 * available.
2146 */
2148 wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
2150 &err);
2151 if (!wrconn)
2152 ereport(ERROR,
2154 errmsg("subscription \"%s\" could not connect to the publisher: %s",
2155 sub->name, err)));
2156
2157 PG_TRY();
2158 {
2161
2163 retain_dead_tuples, origin, NULL, 0,
2164 sub->name);
2165
2168 update_failover ? &opts.failover : NULL,
2169 update_two_phase ? &opts.twophase : NULL);
2170 }
2171 PG_FINALLY();
2172 {
2174 }
2175 PG_END_TRY();
2176 }
2177
2179
2181
2182 /* Wake up related replication workers to handle this change quickly. */
2184
2185 return myself;
2186}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
@ ACLCHECK_NOT_OWNER
Definition acl.h:186
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3880
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4134
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6334
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:943
uint32_t uint32
Definition c.h:624
@ DEPENDENCY_NORMAL
Definition dependency.h:33
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode(int sqlerrcode)
Definition elog.c:875
int errhint(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
#define PG_END_TRY(...)
Definition elog.h:399
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define NOTICE
Definition elog.h:36
#define PG_FINALLY(...)
Definition elog.h:391
#define ereport(elevel,...)
Definition elog.h:152
void err(int eval, const char *fmt,...)
Definition err.c:43
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:688
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:185
char * ForeignServerConnectionString(Oid userid, ForeignServer *server)
Definition foreign.c:202
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:232
Oid MyDatabaseId
Definition globals.c:96
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
return true
Definition isn.c:130
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1185
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
static char * errmsg
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SERVER
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
#define ACL_USAGE
Definition parsenodes.h:84
@ OBJECT_SUBSCRIPTION
static AmcheckOptions opts
Definition pg_amcheck.c:112
#define NAMEDATALEN
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:51
long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, Oid refclassId, Oid refobjectId)
Definition pg_depend.c:411
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
int pg_strcasecmp(const char *s1, const char *s2)
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition postgres.h:383
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:542
Definition pg_list.h:54
#define SUBOPT_STREAMING
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
#define SUBOPT_DISABLE_ON_ERR
static void parse_subscription_options(ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
bool superuser(void)
Definition superuser.c:47
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2803
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:95
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AccessExclusiveLock, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH_SEQUENCES, ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SKIP, AlterSubscription_refresh(), AlterSubscription_refresh_seq(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleUpdate(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications_origin_tables(), CheckAlterSubOption(), CheckSubDeadTupleRetention(), Subscription::conninfo, CStringGetDatum(), CStringGetTextDatum, deleteDependencyRecordsForSpecific(), DEPENDENCY_NORMAL, DirectFunctionCall1, elog, Subscription::enabled, ereport, err(), errcode(), errhint(), errmsg, ERROR, fb(), ForeignServerConnectionString(), Form_pg_subscription, GetForeignServerByName(), GETSTRUCT(), GetSubscription(), GetUserId(), GetUserMapping(), GetUserNameFromId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, Int32GetDatum(), InvalidOid, InvokeObjectPostAlterHook, IsSet, load_file(), LockSharedObject(), logicalrep_workers_find(), LogicalRepWorkersWakeupAtCommit(), LookupGXactBySubid(), LSN_FORMAT_ARGS, LSNGetDatum(), Subscription::maxretention, merge_publications(), MyDatabaseId, Subscription::name, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), opts, Subscription::origin, Subscription::ownersuperuser, parse_subscription_options(), Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, pg_strcasecmp(), PG_TRY, PreventInTransactionBlock(), publicationListToArray(), Subscription::publications, recordDependencyOn(), RelationGetDescr, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_get_progress(), Subscription::retaindeadtuples, Subscription::retentionactive, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, stmt, SUBOPT_BINARY, SUBOPT_COPY_DATA, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, superuser(), table_close(), table_open(), Subscription::twophasestate, values, walrcv_alter_slot, walrcv_check_conninfo, walrcv_connect, walrcv_disconnect, WARNING, wrconn, and XLogRecPtrIsValid.

Referenced by ProcessUtilitySlow().

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char name,
Oid  newOwnerId 
)
extern

Definition at line 2648 of file subscriptioncmds.c.

2649{
2650 Oid subid;
2651 HeapTuple tup;
2652 Relation rel;
2653 ObjectAddress address;
2655
2657
2660
2661 if (!HeapTupleIsValid(tup))
2662 ereport(ERROR,
2664 errmsg("subscription \"%s\" does not exist", name)));
2665
2667 subid = form->oid;
2668
2670
2672
2674
2676
2677 return address;
2678}
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)

References AlterSubscriptionOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg, ERROR, fb(), Form_pg_subscription, GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, MyDatabaseId, name, ObjectAddressSet, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy2, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)
extern

Definition at line 2684 of file subscriptioncmds.c.

2685{
2686 HeapTuple tup;
2687 Relation rel;
2688
2690
2692
2693 if (!HeapTupleIsValid(tup))
2694 ereport(ERROR,
2696 errmsg("subscription with OID %u does not exist", subid)));
2697
2699
2701
2703}
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg, ERROR, fb(), heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned_Owner().

◆ CheckSubDeadTupleRetention()

void CheckSubDeadTupleRetention ( bool  check_guc,
bool  sub_disabled,
int  elevel_for_sub_disabled,
bool  retain_dead_tuples,
bool  retention_active,
bool  max_retention_set 
)
extern

Definition at line 3055 of file subscriptioncmds.c.

3059{
3062
3064 {
3066 ereport(ERROR,
3068 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3069 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3070
3074 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3075 errhint("Consider setting \"%s\" to true.",
3076 "track_commit_timestamp"));
3077
3081 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3083 ? errhint("Consider setting %s to false.",
3084 "retain_dead_tuples") : 0);
3085 }
3086 else if (max_retention_set)
3087 {
3090 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3091 }
3092}
bool track_commit_timestamp
Definition commit_ts.c:121
int wal_level
Definition xlog.c:138
@ WAL_LEVEL_REPLICA
Definition xlog.h:77

References Assert, ereport, errcode(), errhint(), errmsg, ERROR, fb(), NOTICE, track_commit_timestamp, wal_level, WAL_LEVEL_REPLICA, and WARNING.

Referenced by AlterSubscription(), CreateSubscription(), and DisableSubscriptionAndExit().

◆ CreateSubscription()

ObjectAddress CreateSubscription ( ParseState pstate,
CreateSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 620 of file subscriptioncmds.c.

622{
623 Relation rel;
625 Oid subid;
626 bool nulls[Natts_pg_subscription];
628 Oid owner = GetUserId();
630 Oid serverid;
631 char *conninfo;
633 List *publications;
635 SubOpts opts = {0};
637
638 /*
639 * Parse and check options.
640 *
641 * Connection and publication should not be specified here.
642 */
653
654 /*
655 * Since creating a replication slot is not transactional, rolling back
656 * the transaction leaves the created replication slot. So we cannot run
657 * CREATE SUBSCRIPTION inside a transaction block if creating a
658 * replication slot.
659 */
660 if (opts.create_slot)
661 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
662
663 /*
664 * We don't want to allow unprivileged users to be able to trigger
665 * attempts to access arbitrary network destinations, so require the user
666 * to have been specifically authorized to create subscriptions.
667 */
671 errmsg("permission denied to create subscription"),
672 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
673 "pg_create_subscription")));
674
675 /*
676 * Since a subscription is a database object, we also check for CREATE
677 * permission on the database.
678 */
680 owner, ACL_CREATE);
681 if (aclresult != ACLCHECK_OK)
684
685 /*
686 * Non-superusers are required to set a password for authentication, and
687 * that password must be used by the target server, but the superuser can
688 * exempt a subscription from this requirement.
689 */
690 if (!opts.passwordrequired && !superuser_arg(owner))
693 errmsg("password_required=false is superuser-only"),
694 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
695
696 /*
697 * If built with appropriate switch, whine when regression-testing
698 * conventions for subscription names are violated.
699 */
700#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
701 if (strncmp(stmt->subname, "regress_", 8) != 0)
702 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
703#endif
704
706
707 /* Check if name is used */
710 if (OidIsValid(subid))
711 {
714 errmsg("subscription \"%s\" already exists",
715 stmt->subname)));
716 }
717
718 /*
719 * Ensure that system configuration parameters are set appropriately to
720 * support retain_dead_tuples and max_retention_duration.
721 */
723 opts.retaindeadtuples, opts.retaindeadtuples,
724 (opts.maxretention > 0));
725
726 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
727 opts.slot_name == NULL)
728 opts.slot_name = stmt->subname;
729
730 /* The default for synchronous_commit of subscriptions is off. */
731 if (opts.synchronous_commit == NULL)
732 opts.synchronous_commit = "off";
733
734 /*
735 * The default for wal_receiver_timeout of subscriptions is -1, which
736 * means the value is inherited from the server configuration, command
737 * line, or role/database settings.
738 */
739 if (opts.wal_receiver_timeout == NULL)
740 opts.wal_receiver_timeout = "-1";
741
742 /* Load the library providing us libpq calls. */
743 load_file("libpqwalreceiver", false);
744
745 if (stmt->servername)
746 {
747 ForeignServer *server;
748
749 Assert(!stmt->conninfo);
750 conninfo = NULL;
751
752 server = GetForeignServerByName(stmt->servername, false);
754 if (aclresult != ACLCHECK_OK)
756
757 /* make sure a user mapping exists */
758 GetUserMapping(owner, server->serverid);
759
760 serverid = server->serverid;
761 conninfo = ForeignServerConnectionString(owner, server);
762 }
763 else
764 {
765 Assert(stmt->conninfo);
766
767 serverid = InvalidOid;
768 conninfo = stmt->conninfo;
769 }
770
771 /* Check the connection info string. */
772 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
773
774 publications = stmt->publication;
775
776 /* Everything ok, form a new tuple. */
777 memset(values, 0, sizeof(values));
778 memset(nulls, false, sizeof(nulls));
779
792 CharGetDatum(opts.twophase ?
800 BoolGetDatum(opts.retaindeadtuples);
802 Int32GetDatum(opts.maxretention);
804 BoolGetDatum(opts.retaindeadtuples);
806 if (!OidIsValid(serverid))
808 CStringGetTextDatum(conninfo);
809 else
810 nulls[Anum_pg_subscription_subconninfo - 1] = true;
811 if (opts.slot_name)
814 else
815 nulls[Anum_pg_subscription_subslotname - 1] = true;
817 CStringGetTextDatum(opts.synchronous_commit);
819 CStringGetTextDatum(opts.wal_receiver_timeout);
821 publicationListToArray(publications);
824
826
827 /* Insert tuple into catalog. */
830
832
834
835 if (stmt->servername)
836 {
838
839 Assert(OidIsValid(serverid));
840
843 }
844
845 /*
846 * A replication origin is currently created for all subscriptions,
847 * including those that only contain sequences or are otherwise empty.
848 *
849 * XXX: While this is technically unnecessary, optimizing it would require
850 * additional logic to skip origin creation during DDL operations and
851 * apply workers initialization, and to handle origin creation dynamically
852 * when tables are added to the subscription. It is not clear whether
853 * preventing creation of origins is worth additional complexity.
854 */
857
858 /*
859 * Connect to remote side to execute requested commands and fetch table
860 * and sequence info.
861 */
862 if (opts.connect)
863 {
864 char *err;
867
868 /* Try to connect to the publisher. */
869 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
870 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
871 stmt->subname, &err);
872 if (!wrconn)
875 errmsg("subscription \"%s\" could not connect to the publisher: %s",
876 stmt->subname, err)));
877
878 PG_TRY();
879 {
880 bool has_tables = false;
881 List *pubrels;
882 char relation_state;
883
884 check_publications(wrconn, publications);
886 opts.copy_data,
887 opts.retaindeadtuples, opts.origin,
888 NULL, 0, stmt->subname);
890 opts.copy_data, opts.origin,
891 NULL, 0, stmt->subname);
892
893 if (opts.retaindeadtuples)
895
896 /*
897 * Set sync state based on if we were asked to do data copy or
898 * not.
899 */
901
902 /*
903 * Build local relation status info. Relations are for both tables
904 * and sequences from the publisher.
905 */
906 pubrels = fetch_relation_list(wrconn, publications);
907
909 {
910 Oid relid;
911 char relkind;
912 RangeVar *rv = pubrelinfo->rv;
913
914 relid = RangeVarGetRelid(rv, AccessShareLock, false);
915 relkind = get_rel_relkind(relid);
916
917 /* Check for supported relkind. */
918 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
919 rv->schemaname, rv->relname);
920 has_tables |= (relkind != RELKIND_SEQUENCE);
922 InvalidXLogRecPtr, true);
923 }
924
925 /*
926 * If requested, create permanent slot for the subscription. We
927 * won't use the initial snapshot for anything, so no need to
928 * export it.
929 *
930 * XXX: Similar to origins, it is not clear whether preventing the
931 * slot creation for empty and sequence-only subscriptions is
932 * worth additional complexity.
933 */
934 if (opts.create_slot)
935 {
936 bool twophase_enabled = false;
937
938 Assert(opts.slot_name);
939
940 /*
941 * Even if two_phase is set, don't create the slot with
942 * two-phase enabled. Will enable it once all the tables are
943 * synced and ready. This avoids race-conditions like prepared
944 * transactions being skipped due to changes not being applied
945 * due to checks in should_apply_changes_for_rel() when
946 * tablesync for the corresponding tables are in progress. See
947 * comments atop worker.c.
948 *
949 * Note that if tables were specified but copy_data is false
950 * then it is safe to enable two_phase up-front because those
951 * tables are already initially in READY state. When the
952 * subscription has no tables, we leave the twophase state as
953 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
954 * PUBLICATION to work.
955 */
956 if (opts.twophase && !opts.copy_data && has_tables)
957 twophase_enabled = true;
958
960 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
961
964
966 (errmsg("created replication slot \"%s\" on publisher",
967 opts.slot_name)));
968 }
969 }
970 PG_FINALLY();
971 {
973 }
974 PG_END_TRY();
975 }
976 else
978 (errmsg("subscription was created, but is not connected"),
979 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
980
982
984
985 /*
986 * Notify the launcher to start the apply worker if the subscription is
987 * enabled, or to create the conflict detection slot if retain_dead_tuples
988 * is enabled.
989 *
990 * Creating the conflict detection slot is essential even when the
991 * subscription is not enabled. This ensures that dead tuples are
992 * retained, which is necessary for accurately identifying the type of
993 * conflict during replication.
994 */
995 if (opts.enabled || opts.retaindeadtuples)
997
999
1000 return myself;
1001}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
#define OidIsValid(objectId)
Definition c.h:858
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
int errdetail(const char *fmt,...) pg_attribute_printf(1
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define AccessShareLock
Definition lockdefs.h:36
char * get_database_name(Oid dbid)
Definition lsyscache.c:1384
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2309
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
#define InvokeObjectPostCreateHook(classId, objectId, subId)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
@ OBJECT_DATABASE
@ OBJECT_FOREIGN_SERVER
#define ACL_CREATE
Definition parsenodes.h:85
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
void pgstat_create_subscription(Oid subid)
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
char * servername
Definition foreign.h:40
char * relname
Definition primnodes.h:84
char * schemaname
Definition primnodes.h:81
#define SUBOPT_CREATE_SLOT
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_CONNECT
bool superuser_arg(Oid roleid)
Definition superuser.c:57
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1681
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References AccessShareLock, ACL_CREATE, ACL_USAGE, aclcheck_error(), ACLCHECK_OK, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleInsert(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications(), check_publications_origin_sequences(), check_publications_origin_tables(), CheckSubDeadTupleRetention(), CheckSubscriptionRelkind(), CRS_NOEXPORT_SNAPSHOT, CStringGetDatum(), CStringGetTextDatum, DEPENDENCY_NORMAL, DirectFunctionCall1, elog, ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errdetail(), errhint(), errmsg, ERROR, fb(), fetch_relation_list(), foreach_ptr, ForeignServerConnectionString(), get_database_name(), get_rel_relkind(), GetForeignServerByName(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), GetUserMapping(), has_privs_of_role(), heap_form_tuple(), heap_freetuple(), Int32GetDatum(), InvalidOid, InvalidXLogRecPtr, InvokeObjectPostCreateHook, IsSet, load_file(), LSNGetDatum(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), OBJECT_DATABASE, OBJECT_FOREIGN_SERVER, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, opts, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_create_subscription(), PreventInTransactionBlock(), publicationListToArray(), RangeVarGetRelid, recordDependencyOn(), recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, ReplicationOriginNameForLogicalRep(), replorigin_create(), RowExclusiveLock, RangeVar::schemaname, ForeignServer::serverid, ForeignServer::servername, stmt, SUBOPT_BINARY, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, superuser(), superuser_arg(), table_close(), table_open(), UpdateTwoPhaseState(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

◆ defGetStreamingMode()

char defGetStreamingMode ( DefElem def)
extern

Definition at line 3386 of file subscriptioncmds.c.

3387{
3388 /*
3389 * If no parameter value given, assume "true" is meant.
3390 */
3391 if (!def->arg)
3392 return LOGICALREP_STREAM_ON;
3393
3394 /*
3395 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3396 */
3397 switch (nodeTag(def->arg))
3398 {
3399 case T_Integer:
3400 switch (intVal(def->arg))
3401 {
3402 case 0:
3403 return LOGICALREP_STREAM_OFF;
3404 case 1:
3405 return LOGICALREP_STREAM_ON;
3406 default:
3407 /* otherwise, error out below */
3408 break;
3409 }
3410 break;
3411 default:
3412 {
3413 char *sval = defGetString(def);
3414
3415 /*
3416 * The set of strings accepted here should match up with the
3417 * grammar's opt_boolean_or_string production.
3418 */
3419 if (pg_strcasecmp(sval, "false") == 0 ||
3420 pg_strcasecmp(sval, "off") == 0)
3421 return LOGICALREP_STREAM_OFF;
3422 if (pg_strcasecmp(sval, "true") == 0 ||
3423 pg_strcasecmp(sval, "on") == 0)
3424 return LOGICALREP_STREAM_ON;
3425 if (pg_strcasecmp(sval, "parallel") == 0)
3427 }
3428 break;
3429 }
3430
3431 ereport(ERROR,
3433 errmsg("%s requires a Boolean value or \"parallel\"",
3434 def->defname)));
3435 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3436}
char * defGetString(DefElem *def)
Definition define.c:34
#define nodeTag(nodeptr)
Definition nodes.h:139
char * defname
Definition parsenodes.h:860
Node * arg
Definition parsenodes.h:861
#define intVal(v)
Definition value.h:79

References DefElem::arg, defGetString(), DefElem::defname, ereport, errcode(), errmsg, ERROR, fb(), intVal, nodeTag, and pg_strcasecmp().

Referenced by parse_output_parameters(), and parse_subscription_options().

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 2192 of file subscriptioncmds.c.

2193{
2194 Relation rel;
2196 HeapTuple tup;
2197 Oid subid;
2198 Oid subowner;
2199 Datum datum;
2200 bool isnull;
2201 char *subname;
2202 char *conninfo;
2203 char *slotname;
2205 ListCell *lc;
2206 char originname[NAMEDATALEN];
2207 char *err = NULL;
2210 List *rstates;
2211 bool must_use_password;
2212
2213 /*
2214 * The launcher may concurrently start a new worker for this subscription.
2215 * During initialization, the worker checks for subscription validity and
2216 * exits if the subscription has already been dropped. See
2217 * InitializeLogRepWorker.
2218 */
2220
2222 CStringGetDatum(stmt->subname));
2223
2224 if (!HeapTupleIsValid(tup))
2225 {
2226 table_close(rel, NoLock);
2227
2228 if (!stmt->missing_ok)
2229 ereport(ERROR,
2231 errmsg("subscription \"%s\" does not exist",
2232 stmt->subname)));
2233 else
2235 (errmsg("subscription \"%s\" does not exist, skipping",
2236 stmt->subname)));
2237
2238 return;
2239 }
2240
2242 subid = form->oid;
2243 subowner = form->subowner;
2244 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2245
2246 /* must be owner */
2249 stmt->subname);
2250
2251 /* DROP hook for the subscription being removed */
2253
2254 /*
2255 * Lock the subscription so nobody else can do anything with it (including
2256 * the replication workers).
2257 */
2259
2260 /* Get subname */
2263 subname = pstrdup(NameStr(*DatumGetName(datum)));
2264
2265 /* Get conninfo */
2266 if (OidIsValid(form->subserver))
2267 {
2269 ForeignServer *server;
2270
2271 server = GetForeignServer(form->subserver);
2273 form->subowner, ACL_USAGE);
2274 if (aclresult != ACLCHECK_OK)
2275 {
2276 /*
2277 * Unable to generate connection string because permissions on the
2278 * foreign server have been removed. Follow the same logic as an
2279 * unusable subconninfo (which will result in an ERROR later
2280 * unless slot_name = NONE).
2281 */
2282 err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2283 GetUserNameFromId(form->subowner, false),
2284 server->servername);
2285 conninfo = NULL;
2286 }
2287 else
2288 conninfo = ForeignServerConnectionString(form->subowner,
2289 server);
2290 }
2291 else
2292 {
2295 conninfo = TextDatumGetCString(datum);
2296 }
2297
2298 /* Get slotname */
2301 if (!isnull)
2302 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2303 else
2304 slotname = NULL;
2305
2306 /*
2307 * Since dropping a replication slot is not transactional, the replication
2308 * slot stays dropped even if the transaction rolls back. So we cannot
2309 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2310 * replication slot. Also, in this case, we report a message for dropping
2311 * the subscription to the cumulative stats system.
2312 *
2313 * XXX The command name should really be something like "DROP SUBSCRIPTION
2314 * of a subscription that is associated with a replication slot", but we
2315 * don't have the proper facilities for that.
2316 */
2317 if (slotname)
2318 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2319
2322
2323 /* Remove the tuple from catalog. */
2324 CatalogTupleDelete(rel, &tup->t_self);
2325
2327
2328 /*
2329 * Stop all the subscription workers immediately.
2330 *
2331 * This is necessary if we are dropping the replication slot, so that the
2332 * slot becomes accessible.
2333 *
2334 * It is also necessary if the subscription is disabled and was disabled
2335 * in the same transaction. Then the workers haven't seen the disabling
2336 * yet and will still be running, leading to hangs later when we want to
2337 * drop the replication origin. If the subscription was disabled before
2338 * this transaction, then there shouldn't be any workers left, so this
2339 * won't make a difference.
2340 *
2341 * New workers won't be started because we hold an exclusive lock on the
2342 * subscription till the end of the transaction.
2343 */
2344 subworkers = logicalrep_workers_find(subid, false, true);
2345 foreach(lc, subworkers)
2346 {
2348
2350 }
2352
2353 /*
2354 * Remove the no-longer-useful entry in the launcher's table of apply
2355 * worker start times.
2356 *
2357 * If this transaction rolls back, the launcher might restart a failed
2358 * apply worker before wal_retrieve_retry_interval milliseconds have
2359 * elapsed, but that's pretty harmless.
2360 */
2362
2363 /*
2364 * Cleanup of tablesync replication origins.
2365 *
2366 * Any READY-state relations would already have dealt with clean-ups.
2367 *
2368 * Note that the state can't change because we have already stopped both
2369 * the apply and tablesync workers and they can't restart because of
2370 * exclusive lock on the subscription.
2371 */
2372 rstates = GetSubscriptionRelations(subid, true, false, true);
2373 foreach(lc, rstates)
2374 {
2376 Oid relid = rstate->relid;
2377
2378 /* Only cleanup resources of tablesync workers */
2379 if (!OidIsValid(relid))
2380 continue;
2381
2382 /*
2383 * Drop the tablesync's origin tracking if exists.
2384 *
2385 * It is possible that the origin is not yet created for tablesync
2386 * worker so passing missing_ok = true. This can happen for the states
2387 * before SUBREL_STATE_DATASYNC.
2388 */
2390 sizeof(originname));
2391 replorigin_drop_by_name(originname, true, false);
2392 }
2393
2394 /* Clean up dependencies */
2397
2398 /* Remove any associated relation synchronization states. */
2400
2401 /* Remove the origin tracking if exists. */
2403 replorigin_drop_by_name(originname, true, false);
2404
2405 /*
2406 * Tell the cumulative stats system that the subscription is getting
2407 * dropped.
2408 */
2410
2411 /*
2412 * If there is no slot associated with the subscription, we can finish
2413 * here.
2414 */
2415 if (!slotname && rstates == NIL)
2416 {
2417 table_close(rel, NoLock);
2418 return;
2419 }
2420
2421 /*
2422 * Try to acquire the connection necessary for dropping slots.
2423 *
2424 * Note: If the slotname is NONE/NULL then we allow the command to finish
2425 * and users need to manually cleanup the apply and tablesync worker slots
2426 * later.
2427 *
2428 * This has to be at the end because otherwise if there is an error while
2429 * doing the database operations we won't be able to rollback dropped
2430 * slot.
2431 */
2432 load_file("libpqwalreceiver", false);
2433
2434 if (conninfo)
2435 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2436 subname, &err);
2437
2438 if (wrconn == NULL)
2439 {
2440 if (!slotname)
2441 {
2442 /* be tidy */
2444 table_close(rel, NoLock);
2445 return;
2446 }
2447 else
2448 {
2449 ReportSlotConnectionError(rstates, subid, slotname, err);
2450 }
2451 }
2452
2453 PG_TRY();
2454 {
2455 foreach(lc, rstates)
2456 {
2458 Oid relid = rstate->relid;
2459
2460 /* Only cleanup resources of tablesync workers */
2461 if (!OidIsValid(relid))
2462 continue;
2463
2464 /*
2465 * Drop the tablesync slots associated with removed tables.
2466 *
2467 * For SYNCDONE/READY states, the tablesync slot is known to have
2468 * already been dropped by the tablesync worker.
2469 *
2470 * For other states, there is no certainty, maybe the slot does
2471 * not exist yet. Also, if we fail after removing some of the
2472 * slots, next time, it will again try to drop already dropped
2473 * slots and fail. For these reasons, we allow missing_ok = true
2474 * for the drop.
2475 */
2476 if (rstate->state != SUBREL_STATE_SYNCDONE)
2477 {
2478 char syncslotname[NAMEDATALEN] = {0};
2479
2481 sizeof(syncslotname));
2483 }
2484 }
2485
2487
2488 /*
2489 * If there is a slot associated with the subscription, then drop the
2490 * replication slot at the publisher.
2491 */
2492 if (slotname)
2493 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2494 }
2495 PG_FINALLY();
2496 {
2498 }
2499 PG_END_TRY();
2500
2501 table_close(rel, NoLock);
2502}
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:835
#define _(x)
Definition elog.c:96
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:662
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
void list_free(List *list)
Definition list.c:1546
#define NoLock
Definition lockdefs.h:34
char * pstrdup(const char *in)
Definition mcxt.c:1910
#define InvokeObjectDropHook(classId, objectId, subId)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
Definition pg_depend.c:314
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
NameData subname
void pgstat_drop_subscription(Oid subid)
static Name DatumGetName(Datum X)
Definition postgres.h:393
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
LogicalRepWorkerType type
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:231
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:626
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1236

References _, AccessExclusiveLock, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ApplyLauncherForgetWorkerStartTime(), CatalogTupleDelete(), CStringGetDatum(), DatumGetName(), deleteDependencyRecordsFor(), deleteSharedDependencyRecordsFor(), ereport, err(), errcode(), errmsg, ERROR, EventTriggerSQLDropAddObject(), fb(), ForeignServerConnectionString(), Form_pg_subscription, GetForeignServer(), GETSTRUCT(), GetSubscriptionRelations(), GetUserId(), GetUserNameFromId(), HeapTupleIsValid, InvalidOid, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), MyDatabaseId, NAMEDATALEN, NameStr, NIL, NoLock, NOTICE, object_aclcheck(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_drop_subscription(), PreventInTransactionBlock(), psprintf(), pstrdup(), ReleaseSysCache(), SubscriptionRelState::relid, LogicalRepWorker::relid, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), ReportSlotConnectionError(), RowExclusiveLock, SearchSysCache2(), ForeignServer::servername, SubscriptionRelState::state, stmt, LogicalRepWorker::subid, subname, superuser_arg(), SysCacheGetAttr(), SysCacheGetAttrNotNull(), table_close(), table_open(), TextDatumGetCString, LogicalRepWorker::type, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().