PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.h File Reference
Include dependency graph for subscriptioncmds.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

ObjectAddress CreateSubscription (ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscription (ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 
char defGetStreamingMode (DefElem *def)
 
void CheckSubDeadTupleRetention (bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
 

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( ParseState pstate,
AlterSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 1413 of file subscriptioncmds.c.

1415{
1416 Relation rel;
1418 bool nulls[Natts_pg_subscription];
1421 HeapTuple tup;
1422 Oid subid;
1423 bool update_tuple = false;
1424 bool update_failover = false;
1425 bool update_two_phase = false;
1426 bool check_pub_rdt = false;
1427 bool retain_dead_tuples;
1428 int max_retention;
1429 bool retention_active;
1430 char *new_conninfo = NULL;
1431 char *origin;
1432 Subscription *sub;
1435 SubOpts opts = {0};
1436
1438
1439 /* Fetch the existing tuple. */
1441 CStringGetDatum(stmt->subname));
1442
1443 if (!HeapTupleIsValid(tup))
1444 ereport(ERROR,
1446 errmsg("subscription \"%s\" does not exist",
1447 stmt->subname)));
1448
1450 subid = form->oid;
1451
1452 /* must be owner */
1455 stmt->subname);
1456
1457 /*
1458 * Skip ACL checks on the subscription's foreign server, if any. If
1459 * changing the server (or replacing it with a raw connection), then the
1460 * old one will be removed anyway. If changing something unrelated,
1461 * there's no need to do an additional ACL check here; that will be done
1462 * by the subscription worker anyway.
1463 */
1464 sub = GetSubscription(subid, false, false);
1465
1467 origin = sub->origin;
1470
1471 /*
1472 * Don't allow non-superuser modification of a subscription with
1473 * password_required=false.
1474 */
1475 if (!sub->passwordrequired && !superuser())
1476 ereport(ERROR,
1478 errmsg("password_required=false is superuser-only"),
1479 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1480
1481 /* Lock the subscription so nobody else can do anything with it. */
1483
1484 /* Form a new tuple. */
1485 memset(values, 0, sizeof(values));
1486 memset(nulls, false, sizeof(nulls));
1487 memset(replaces, false, sizeof(replaces));
1488
1490
1491 switch (stmt->kind)
1492 {
1494 {
1505
1506 parse_subscription_options(pstate, stmt->options,
1508
1509 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1510 {
1511 /*
1512 * The subscription must be disabled to allow slot_name as
1513 * 'none', otherwise, the apply worker will repeatedly try
1514 * to stream the data using that slot_name which neither
1515 * exists on the publisher nor the user will be allowed to
1516 * create it.
1517 */
1518 if (sub->enabled && !opts.slot_name)
1519 ereport(ERROR,
1521 errmsg("cannot set %s for enabled subscription",
1522 "slot_name = NONE")));
1523
1524 if (opts.slot_name)
1527 else
1528 nulls[Anum_pg_subscription_subslotname - 1] = true;
1530 }
1531
1532 if (opts.synchronous_commit)
1533 {
1535 CStringGetTextDatum(opts.synchronous_commit);
1537 }
1538
1539 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1540 {
1542 BoolGetDatum(opts.binary);
1544 }
1545
1546 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1547 {
1549 CharGetDatum(opts.streaming);
1551 }
1552
1553 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1554 {
1556 = BoolGetDatum(opts.disableonerr);
1558 = true;
1559 }
1560
1561 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1562 {
1563 /* Non-superuser may not disable password_required. */
1564 if (!opts.passwordrequired && !superuser())
1565 ereport(ERROR,
1567 errmsg("password_required=false is superuser-only"),
1568 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1569
1571 = BoolGetDatum(opts.passwordrequired);
1573 = true;
1574 }
1575
1576 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1577 {
1579 BoolGetDatum(opts.runasowner);
1581 }
1582
1583 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1584 {
1585 /*
1586 * We need to update both the slot and the subscription
1587 * for the two_phase option. We can enable the two_phase
1588 * option for a slot only once the initial data
1589 * synchronization is done. This is to avoid missing some
1590 * data as explained in comments atop worker.c.
1591 */
1592 update_two_phase = !opts.twophase;
1593
1594 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1595 isTopLevel);
1596
1597 /*
1598 * Modifying the two_phase slot option requires a slot
1599 * lookup by slot name, so changing the slot name at the
1600 * same time is not allowed.
1601 */
1602 if (update_two_phase &&
1603 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1604 ereport(ERROR,
1606 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1607
1608 /*
1609 * Note that workers may still survive even if the
1610 * subscription has been disabled.
1611 *
1612 * Ensure workers have already been exited to avoid
1613 * getting prepared transactions while we are disabling
1614 * the two_phase option. Otherwise, the changes of an
1615 * already prepared transaction can be replicated again
1616 * along with its corresponding commit, leading to
1617 * duplicate data or errors.
1618 */
1619 if (logicalrep_workers_find(subid, true, true))
1620 ereport(ERROR,
1622 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1623 errhint("Try again after some time.")));
1624
1625 /*
1626 * two_phase cannot be disabled if there are any
1627 * uncommitted prepared transactions present otherwise it
1628 * can lead to duplicate data or errors as explained in
1629 * the comment above.
1630 */
1631 if (update_two_phase &&
1633 LookupGXactBySubid(subid))
1634 ereport(ERROR,
1636 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1637 errhint("Resolve these transactions and try again.")));
1638
1639 /* Change system catalog accordingly */
1641 CharGetDatum(opts.twophase ?
1645 }
1646
1647 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1648 {
1649 /*
1650 * Similar to the two_phase case above, we need to update
1651 * the failover option for both the slot and the
1652 * subscription.
1653 */
1654 update_failover = true;
1655
1656 CheckAlterSubOption(sub, "failover", update_failover,
1657 isTopLevel);
1658
1660 BoolGetDatum(opts.failover);
1662 }
1663
1664 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1665 {
1667 BoolGetDatum(opts.retaindeadtuples);
1669
1670 /*
1671 * Update the retention status only if there's a change in
1672 * the retain_dead_tuples option value.
1673 *
1674 * Automatically marking retention as active when
1675 * retain_dead_tuples is enabled may not always be ideal,
1676 * especially if retention was previously stopped and the
1677 * user toggles retain_dead_tuples without adjusting the
1678 * publisher workload. However, this behavior provides a
1679 * convenient way for users to manually refresh the
1680 * retention status. Since retention will be stopped again
1681 * unless the publisher workload is reduced, this approach
1682 * is acceptable for now.
1683 */
1684 if (opts.retaindeadtuples != sub->retaindeadtuples)
1685 {
1687 BoolGetDatum(opts.retaindeadtuples);
1689
1690 retention_active = opts.retaindeadtuples;
1691 }
1692
1693 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1694
1695 /*
1696 * Workers may continue running even after the
1697 * subscription has been disabled.
1698 *
1699 * To prevent race conditions (as described in
1700 * CheckAlterSubOption()), ensure that all worker
1701 * processes have already exited before proceeding.
1702 */
1703 if (logicalrep_workers_find(subid, true, true))
1704 ereport(ERROR,
1706 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1707 errhint("Try again after some time.")));
1708
1709 /*
1710 * Notify the launcher to manage the replication slot for
1711 * conflict detection. This ensures that replication slot
1712 * is efficiently handled (created, updated, or dropped)
1713 * in response to any configuration changes.
1714 */
1716
1717 check_pub_rdt = opts.retaindeadtuples;
1718 retain_dead_tuples = opts.retaindeadtuples;
1719 }
1720
1721 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1722 {
1724 Int32GetDatum(opts.maxretention);
1726
1727 max_retention = opts.maxretention;
1728 }
1729
1730 /*
1731 * Ensure that system configuration parameters are set
1732 * appropriately to support retain_dead_tuples and
1733 * max_retention_duration.
1734 */
1735 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1736 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1740 (max_retention > 0));
1741
1742 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1743 {
1745 CStringGetTextDatum(opts.origin);
1747
1748 /*
1749 * Check if changes from different origins may be received
1750 * from the publisher when the origin is changed to ANY
1751 * and retain_dead_tuples is enabled.
1752 */
1755
1756 origin = opts.origin;
1757 }
1758
1759 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1760 {
1762 CStringGetTextDatum(opts.wal_receiver_timeout);
1764 }
1765
1766 update_tuple = true;
1767 break;
1768 }
1769
1771 {
1772 parse_subscription_options(pstate, stmt->options,
1774 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1775
1776 if (!sub->slotname && opts.enabled)
1777 ereport(ERROR,
1779 errmsg("cannot enable subscription that does not have a slot name")));
1780
1781 /*
1782 * Check track_commit_timestamp only when enabling the
1783 * subscription in case it was disabled after creation. See
1784 * comments atop CheckSubDeadTupleRetention() for details.
1785 */
1786 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1788 sub->retentionactive, false);
1789
1791 BoolGetDatum(opts.enabled);
1793
1794 if (opts.enabled)
1796
1797 update_tuple = true;
1798
1799 /*
1800 * The subscription might be initially created with
1801 * connect=false and retain_dead_tuples=true, meaning the
1802 * remote server's status may not be checked. Ensure this
1803 * check is conducted now.
1804 */
1805 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1806 break;
1807 }
1808
1810 {
1814
1815 /*
1816 * Remove what was there before, either another foreign server
1817 * or a connection string.
1818 */
1819 if (form->subserver)
1820 {
1823 ForeignServerRelationId, form->subserver);
1824 }
1825 else
1826 {
1827 nulls[Anum_pg_subscription_subconninfo - 1] = true;
1829 }
1830
1831 /*
1832 * Check that the subscription owner has USAGE privileges on
1833 * the server.
1834 */
1835 new_server = GetForeignServerByName(stmt->servername, false);
1837 new_server->serverid,
1838 form->subowner, ACL_USAGE);
1839 if (aclresult != ACLCHECK_OK)
1840 ereport(ERROR,
1842 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1843 GetUserNameFromId(form->subowner, false),
1844 new_server->servername));
1845
1846 /* make sure a user mapping exists */
1847 GetUserMapping(form->subowner, new_server->serverid);
1848
1850 new_server);
1851
1852 /* Load the library providing us libpq calls. */
1853 load_file("libpqwalreceiver", false);
1854 /* Check the connection info string. */
1856 sub->passwordrequired && !sub->ownersuperuser);
1857
1860
1863
1864 update_tuple = true;
1865 }
1866
1867 /*
1868 * Since the remote server configuration might have changed,
1869 * perform a check to ensure it permits enabling
1870 * retain_dead_tuples.
1871 */
1873 break;
1874
1876 /* remove reference to foreign server and dependencies, if present */
1877 if (form->subserver)
1878 {
1881 ForeignServerRelationId, form->subserver);
1882
1885 }
1886
1887 new_conninfo = stmt->conninfo;
1888
1889 /* Load the library providing us libpq calls. */
1890 load_file("libpqwalreceiver", false);
1891 /* Check the connection info string. */
1893 sub->passwordrequired && !sub->ownersuperuser);
1894
1896 CStringGetTextDatum(stmt->conninfo);
1898 update_tuple = true;
1899
1900 /*
1901 * Since the remote server configuration might have changed,
1902 * perform a check to ensure it permits enabling
1903 * retain_dead_tuples.
1904 */
1906 break;
1907
1909 {
1911 parse_subscription_options(pstate, stmt->options,
1913
1915 publicationListToArray(stmt->publication);
1917
1918 update_tuple = true;
1919
1920 /* Refresh if user asked us to. */
1921 if (opts.refresh)
1922 {
1923 if (!sub->enabled)
1924 ereport(ERROR,
1926 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1927 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1928
1929 /*
1930 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1931 * why this is not allowed.
1932 */
1933 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1934 ereport(ERROR,
1936 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1937 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1938
1939 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1940
1941 /* Make sure refresh sees the new list of publications. */
1942 sub->publications = stmt->publication;
1943
1944 AlterSubscription_refresh(sub, opts.copy_data,
1945 stmt->publication);
1946 }
1947
1948 break;
1949 }
1950
1953 {
1954 List *publist;
1956
1958 parse_subscription_options(pstate, stmt->options,
1960
1961 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1965
1966 update_tuple = true;
1967
1968 /* Refresh if user asked us to. */
1969 if (opts.refresh)
1970 {
1971 /* We only need to validate user specified publications. */
1972 List *validate_publications = (isadd) ? stmt->publication : NULL;
1973
1974 if (!sub->enabled)
1975 ereport(ERROR,
1977 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1978 /* translator: %s is an SQL ALTER command */
1979 errhint("Use %s instead.",
1980 isadd ?
1981 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1982 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1983
1984 /*
1985 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1986 * why this is not allowed.
1987 */
1988 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1989 ereport(ERROR,
1991 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1992 /* translator: %s is an SQL ALTER command */
1993 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1994 isadd ?
1995 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1996 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1997
1998 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1999
2000 /* Refresh the new list of publications. */
2001 sub->publications = publist;
2002
2003 AlterSubscription_refresh(sub, opts.copy_data,
2005 }
2006
2007 break;
2008 }
2009
2011 {
2012 if (!sub->enabled)
2013 ereport(ERROR,
2015 errmsg("%s is not allowed for disabled subscriptions",
2016 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2017
2018 parse_subscription_options(pstate, stmt->options,
2020
2021 /*
2022 * The subscription option "two_phase" requires that
2023 * replication has passed the initial table synchronization
2024 * phase before the two_phase becomes properly enabled.
2025 *
2026 * But, having reached this two-phase commit "enabled" state
2027 * we must not allow any subsequent table initialization to
2028 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2029 * disallowed when the user had requested two_phase = on mode.
2030 *
2031 * The exception to this restriction is when copy_data =
2032 * false, because when copy_data is false the tablesync will
2033 * start already in READY state and will exit directly without
2034 * doing anything.
2035 *
2036 * For more details see comments atop worker.c.
2037 */
2038 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2039 ereport(ERROR,
2041 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2042 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2043
2044 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2045
2046 AlterSubscription_refresh(sub, opts.copy_data, NULL);
2047
2048 break;
2049 }
2050
2052 {
2053 if (!sub->enabled)
2054 ereport(ERROR,
2056 errmsg("%s is not allowed for disabled subscriptions",
2057 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2058
2060
2061 break;
2062 }
2063
2065 {
2066 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
2067
2068 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
2069 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2070
2071 /*
2072 * If the user sets subskiplsn, we do a sanity check to make
2073 * sure that the specified LSN is a probable value.
2074 */
2075 if (XLogRecPtrIsValid(opts.lsn))
2076 {
2078 char originname[NAMEDATALEN];
2079 XLogRecPtr remote_lsn;
2080
2082 originname, sizeof(originname));
2084 remote_lsn = replorigin_get_progress(originid, false);
2085
2086 /* Check the given LSN is at least a future LSN */
2087 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
2088 ereport(ERROR,
2090 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2091 LSN_FORMAT_ARGS(opts.lsn),
2092 LSN_FORMAT_ARGS(remote_lsn))));
2093 }
2094
2097
2098 update_tuple = true;
2099 break;
2100 }
2101
2102 default:
2103 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2104 stmt->kind);
2105 }
2106
2107 /* Update the catalog if needed. */
2108 if (update_tuple)
2109 {
2111 replaces);
2112
2113 CatalogTupleUpdate(rel, &tup->t_self, tup);
2114
2116 }
2117
2118 /*
2119 * Try to acquire the connection necessary either for modifying the slot
2120 * or for checking if the remote server permits enabling
2121 * retain_dead_tuples.
2122 *
2123 * This has to be at the end because otherwise if there is an error while
2124 * doing the database operations we won't be able to rollback altered
2125 * slot.
2126 */
2128 {
2129 bool must_use_password;
2130 char *err;
2132
2133 /* Load the library providing us libpq calls. */
2134 load_file("libpqwalreceiver", false);
2135
2136 /*
2137 * Try to connect to the publisher, using the new connection string if
2138 * available.
2139 */
2141 wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
2143 &err);
2144 if (!wrconn)
2145 ereport(ERROR,
2147 errmsg("subscription \"%s\" could not connect to the publisher: %s",
2148 sub->name, err)));
2149
2150 PG_TRY();
2151 {
2154
2156 retain_dead_tuples, origin, NULL, 0,
2157 sub->name);
2158
2161 update_failover ? &opts.failover : NULL,
2162 update_two_phase ? &opts.twophase : NULL);
2163 }
2164 PG_FINALLY();
2165 {
2167 }
2168 PG_END_TRY();
2169 }
2170
2172
2174
2175 /* Wake up related replication workers to handle this change quickly. */
2177
2178 return myself;
2179}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
@ ACLCHECK_NOT_OWNER
Definition acl.h:186
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3880
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4134
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6334
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:943
uint32_t uint32
Definition c.h:624
@ DEPENDENCY_NORMAL
Definition dependency.h:33
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode(int sqlerrcode)
Definition elog.c:875
int errhint(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
#define PG_END_TRY(...)
Definition elog.h:399
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define NOTICE
Definition elog.h:36
#define PG_FINALLY(...)
Definition elog.h:391
#define ereport(elevel,...)
Definition elog.h:152
void err(int eval, const char *fmt,...)
Definition err.c:43
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:688
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:185
char * ForeignServerConnectionString(Oid userid, ForeignServer *server)
Definition foreign.c:202
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:232
Oid MyDatabaseId
Definition globals.c:96
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
return true
Definition isn.c:130
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1185
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
static char * errmsg
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SERVER
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
#define ACL_USAGE
Definition parsenodes.h:84
@ OBJECT_SUBSCRIPTION
static AmcheckOptions opts
Definition pg_amcheck.c:112
#define NAMEDATALEN
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:47
long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, Oid refclassId, Oid refobjectId)
Definition pg_depend.c:400
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
int pg_strcasecmp(const char *s1, const char *s2)
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition postgres.h:383
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:542
Definition pg_list.h:54
#define SUBOPT_STREAMING
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
#define SUBOPT_DISABLE_ON_ERR
static void parse_subscription_options(ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
bool superuser(void)
Definition superuser.c:47
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2803
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:95
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AccessExclusiveLock, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH_SEQUENCES, ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SKIP, AlterSubscription_refresh(), AlterSubscription_refresh_seq(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleUpdate(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications_origin_tables(), CheckAlterSubOption(), CheckSubDeadTupleRetention(), Subscription::conninfo, CStringGetDatum(), CStringGetTextDatum, deleteDependencyRecordsForSpecific(), DEPENDENCY_NORMAL, DirectFunctionCall1, elog, Subscription::enabled, ereport, err(), errcode(), errhint(), errmsg, ERROR, fb(), ForeignServerConnectionString(), Form_pg_subscription, GetForeignServerByName(), GETSTRUCT(), GetSubscription(), GetUserId(), GetUserMapping(), GetUserNameFromId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, Int32GetDatum(), InvalidOid, InvokeObjectPostAlterHook, IsSet, load_file(), LockSharedObject(), logicalrep_workers_find(), LogicalRepWorkersWakeupAtCommit(), LookupGXactBySubid(), LSN_FORMAT_ARGS, LSNGetDatum(), Subscription::maxretention, merge_publications(), MyDatabaseId, Subscription::name, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), opts, Subscription::origin, Subscription::ownersuperuser, parse_subscription_options(), Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, pg_strcasecmp(), PG_TRY, PreventInTransactionBlock(), publicationListToArray(), Subscription::publications, recordDependencyOn(), RelationGetDescr, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_get_progress(), Subscription::retaindeadtuples, Subscription::retentionactive, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, stmt, SUBOPT_BINARY, SUBOPT_COPY_DATA, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, superuser(), table_close(), table_open(), Subscription::twophasestate, values, walrcv_alter_slot, walrcv_check_conninfo, walrcv_connect, walrcv_disconnect, WARNING, wrconn, and XLogRecPtrIsValid.

Referenced by ProcessUtilitySlow().

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char name,
Oid  newOwnerId 
)
extern

Definition at line 2641 of file subscriptioncmds.c.

2642{
2643 Oid subid;
2644 HeapTuple tup;
2645 Relation rel;
2646 ObjectAddress address;
2648
2650
2653
2654 if (!HeapTupleIsValid(tup))
2655 ereport(ERROR,
2657 errmsg("subscription \"%s\" does not exist", name)));
2658
2660 subid = form->oid;
2661
2663
2665
2667
2669
2670 return address;
2671}
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)

References AlterSubscriptionOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg, ERROR, fb(), Form_pg_subscription, GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, MyDatabaseId, name, ObjectAddressSet, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy2, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)
extern

Definition at line 2677 of file subscriptioncmds.c.

2678{
2679 HeapTuple tup;
2680 Relation rel;
2681
2683
2685
2686 if (!HeapTupleIsValid(tup))
2687 ereport(ERROR,
2689 errmsg("subscription with OID %u does not exist", subid)));
2690
2692
2694
2696}
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg, ERROR, fb(), heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned_Owner().

◆ CheckSubDeadTupleRetention()

void CheckSubDeadTupleRetention ( bool  check_guc,
bool  sub_disabled,
int  elevel_for_sub_disabled,
bool  retain_dead_tuples,
bool  retention_active,
bool  max_retention_set 
)
extern

Definition at line 3048 of file subscriptioncmds.c.

3052{
3055
3057 {
3059 ereport(ERROR,
3061 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3062 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3063
3067 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3068 errhint("Consider setting \"%s\" to true.",
3069 "track_commit_timestamp"));
3070
3074 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3076 ? errhint("Consider setting %s to false.",
3077 "retain_dead_tuples") : 0);
3078 }
3079 else if (max_retention_set)
3080 {
3083 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3084 }
3085}
bool track_commit_timestamp
Definition commit_ts.c:121
int wal_level
Definition xlog.c:138
@ WAL_LEVEL_REPLICA
Definition xlog.h:77

References Assert, ereport, errcode(), errhint(), errmsg, ERROR, fb(), NOTICE, track_commit_timestamp, wal_level, WAL_LEVEL_REPLICA, and WARNING.

Referenced by AlterSubscription(), CreateSubscription(), and DisableSubscriptionAndExit().

◆ CreateSubscription()

ObjectAddress CreateSubscription ( ParseState pstate,
CreateSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 615 of file subscriptioncmds.c.

617{
618 Relation rel;
620 Oid subid;
621 bool nulls[Natts_pg_subscription];
623 Oid owner = GetUserId();
625 Oid serverid;
626 char *conninfo;
628 List *publications;
630 SubOpts opts = {0};
632
633 /*
634 * Parse and check options.
635 *
636 * Connection and publication should not be specified here.
637 */
648
649 /*
650 * Since creating a replication slot is not transactional, rolling back
651 * the transaction leaves the created replication slot. So we cannot run
652 * CREATE SUBSCRIPTION inside a transaction block if creating a
653 * replication slot.
654 */
655 if (opts.create_slot)
656 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
657
658 /*
659 * We don't want to allow unprivileged users to be able to trigger
660 * attempts to access arbitrary network destinations, so require the user
661 * to have been specifically authorized to create subscriptions.
662 */
666 errmsg("permission denied to create subscription"),
667 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
668 "pg_create_subscription")));
669
670 /*
671 * Since a subscription is a database object, we also check for CREATE
672 * permission on the database.
673 */
675 owner, ACL_CREATE);
676 if (aclresult != ACLCHECK_OK)
679
680 /*
681 * Non-superusers are required to set a password for authentication, and
682 * that password must be used by the target server, but the superuser can
683 * exempt a subscription from this requirement.
684 */
685 if (!opts.passwordrequired && !superuser_arg(owner))
688 errmsg("password_required=false is superuser-only"),
689 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
690
691 /*
692 * If built with appropriate switch, whine when regression-testing
693 * conventions for subscription names are violated.
694 */
695#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
696 if (strncmp(stmt->subname, "regress_", 8) != 0)
697 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
698#endif
699
701
702 /* Check if name is used */
705 if (OidIsValid(subid))
706 {
709 errmsg("subscription \"%s\" already exists",
710 stmt->subname)));
711 }
712
713 /*
714 * Ensure that system configuration parameters are set appropriately to
715 * support retain_dead_tuples and max_retention_duration.
716 */
718 opts.retaindeadtuples, opts.retaindeadtuples,
719 (opts.maxretention > 0));
720
721 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
722 opts.slot_name == NULL)
723 opts.slot_name = stmt->subname;
724
725 /* The default for synchronous_commit of subscriptions is off. */
726 if (opts.synchronous_commit == NULL)
727 opts.synchronous_commit = "off";
728
729 /*
730 * The default for wal_receiver_timeout of subscriptions is -1, which
731 * means the value is inherited from the server configuration, command
732 * line, or role/database settings.
733 */
734 if (opts.wal_receiver_timeout == NULL)
735 opts.wal_receiver_timeout = "-1";
736
737 /* Load the library providing us libpq calls. */
738 load_file("libpqwalreceiver", false);
739
740 if (stmt->servername)
741 {
742 ForeignServer *server;
743
744 Assert(!stmt->conninfo);
745 conninfo = NULL;
746
747 server = GetForeignServerByName(stmt->servername, false);
749 if (aclresult != ACLCHECK_OK)
751
752 /* make sure a user mapping exists */
753 GetUserMapping(owner, server->serverid);
754
755 serverid = server->serverid;
756 conninfo = ForeignServerConnectionString(owner, server);
757 }
758 else
759 {
760 Assert(stmt->conninfo);
761
762 serverid = InvalidOid;
763 conninfo = stmt->conninfo;
764 }
765
766 /* Check the connection info string. */
767 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
768
769 publications = stmt->publication;
770
771 /* Everything ok, form a new tuple. */
772 memset(values, 0, sizeof(values));
773 memset(nulls, false, sizeof(nulls));
774
787 CharGetDatum(opts.twophase ?
795 BoolGetDatum(opts.retaindeadtuples);
797 Int32GetDatum(opts.maxretention);
799 BoolGetDatum(opts.retaindeadtuples);
801 if (!OidIsValid(serverid))
803 CStringGetTextDatum(conninfo);
804 else
805 nulls[Anum_pg_subscription_subconninfo - 1] = true;
806 if (opts.slot_name)
809 else
810 nulls[Anum_pg_subscription_subslotname - 1] = true;
812 CStringGetTextDatum(opts.synchronous_commit);
814 CStringGetTextDatum(opts.wal_receiver_timeout);
816 publicationListToArray(publications);
819
821
822 /* Insert tuple into catalog. */
825
827
829
830 if (stmt->servername)
831 {
833
834 Assert(OidIsValid(serverid));
835
838 }
839
840 /*
841 * A replication origin is currently created for all subscriptions,
842 * including those that only contain sequences or are otherwise empty.
843 *
844 * XXX: While this is technically unnecessary, optimizing it would require
845 * additional logic to skip origin creation during DDL operations and
846 * apply workers initialization, and to handle origin creation dynamically
847 * when tables are added to the subscription. It is not clear whether
848 * preventing creation of origins is worth additional complexity.
849 */
852
853 /*
854 * Connect to remote side to execute requested commands and fetch table
855 * and sequence info.
856 */
857 if (opts.connect)
858 {
859 char *err;
862
863 /* Try to connect to the publisher. */
864 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
865 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
866 stmt->subname, &err);
867 if (!wrconn)
870 errmsg("subscription \"%s\" could not connect to the publisher: %s",
871 stmt->subname, err)));
872
873 PG_TRY();
874 {
875 bool has_tables = false;
876 List *pubrels;
877 char relation_state;
878
879 check_publications(wrconn, publications);
881 opts.copy_data,
882 opts.retaindeadtuples, opts.origin,
883 NULL, 0, stmt->subname);
885 opts.copy_data, opts.origin,
886 NULL, 0, stmt->subname);
887
888 if (opts.retaindeadtuples)
890
891 /*
892 * Set sync state based on if we were asked to do data copy or
893 * not.
894 */
896
897 /*
898 * Build local relation status info. Relations are for both tables
899 * and sequences from the publisher.
900 */
901 pubrels = fetch_relation_list(wrconn, publications);
902
904 {
905 Oid relid;
906 char relkind;
907 RangeVar *rv = pubrelinfo->rv;
908
909 relid = RangeVarGetRelid(rv, AccessShareLock, false);
910 relkind = get_rel_relkind(relid);
911
912 /* Check for supported relkind. */
913 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
914 rv->schemaname, rv->relname);
915 has_tables |= (relkind != RELKIND_SEQUENCE);
917 InvalidXLogRecPtr, true);
918 }
919
920 /*
921 * If requested, create permanent slot for the subscription. We
922 * won't use the initial snapshot for anything, so no need to
923 * export it.
924 *
925 * XXX: Similar to origins, it is not clear whether preventing the
926 * slot creation for empty and sequence-only subscriptions is
927 * worth additional complexity.
928 */
929 if (opts.create_slot)
930 {
931 bool twophase_enabled = false;
932
933 Assert(opts.slot_name);
934
935 /*
936 * Even if two_phase is set, don't create the slot with
937 * two-phase enabled. Will enable it once all the tables are
938 * synced and ready. This avoids race-conditions like prepared
939 * transactions being skipped due to changes not being applied
940 * due to checks in should_apply_changes_for_rel() when
941 * tablesync for the corresponding tables are in progress. See
942 * comments atop worker.c.
943 *
944 * Note that if tables were specified but copy_data is false
945 * then it is safe to enable two_phase up-front because those
946 * tables are already initially in READY state. When the
947 * subscription has no tables, we leave the twophase state as
948 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
949 * PUBLICATION to work.
950 */
951 if (opts.twophase && !opts.copy_data && has_tables)
952 twophase_enabled = true;
953
955 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
956
959
961 (errmsg("created replication slot \"%s\" on publisher",
962 opts.slot_name)));
963 }
964 }
965 PG_FINALLY();
966 {
968 }
969 PG_END_TRY();
970 }
971 else
973 (errmsg("subscription was created, but is not connected"),
974 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
975
977
979
980 /*
981 * Notify the launcher to start the apply worker if the subscription is
982 * enabled, or to create the conflict detection slot if retain_dead_tuples
983 * is enabled.
984 *
985 * Creating the conflict detection slot is essential even when the
986 * subscription is not enabled. This ensures that dead tuples are
987 * retained, which is necessary for accurately identifying the type of
988 * conflict during replication.
989 */
990 if (opts.enabled || opts.retaindeadtuples)
992
994
995 return myself;
996}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
#define OidIsValid(objectId)
Definition c.h:858
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
int errdetail(const char *fmt,...) pg_attribute_printf(1
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define AccessShareLock
Definition lockdefs.h:36
char * get_database_name(Oid dbid)
Definition lsyscache.c:1323
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2234
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
#define InvokeObjectPostCreateHook(classId, objectId, subId)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
@ OBJECT_DATABASE
@ OBJECT_FOREIGN_SERVER
#define ACL_CREATE
Definition parsenodes.h:85
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
void pgstat_create_subscription(Oid subid)
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
char * servername
Definition foreign.h:40
char * relname
Definition primnodes.h:84
char * schemaname
Definition primnodes.h:81
#define SUBOPT_CREATE_SLOT
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_CONNECT
bool superuser_arg(Oid roleid)
Definition superuser.c:57
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1681
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References AccessShareLock, ACL_CREATE, ACL_USAGE, aclcheck_error(), ACLCHECK_OK, AddSubscriptionRelState(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleInsert(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications(), check_publications_origin_sequences(), check_publications_origin_tables(), CheckSubDeadTupleRetention(), CheckSubscriptionRelkind(), CRS_NOEXPORT_SNAPSHOT, CStringGetDatum(), CStringGetTextDatum, DEPENDENCY_NORMAL, DirectFunctionCall1, elog, ereport, err(), errcode(), ERRCODE_DUPLICATE_OBJECT, errdetail(), errhint(), errmsg, ERROR, fb(), fetch_relation_list(), foreach_ptr, ForeignServerConnectionString(), get_database_name(), get_rel_relkind(), GetForeignServerByName(), GetNewOidWithIndex(), GetSysCacheOid2, GetUserId(), GetUserMapping(), has_privs_of_role(), heap_form_tuple(), heap_freetuple(), Int32GetDatum(), InvalidOid, InvalidXLogRecPtr, InvokeObjectPostCreateHook, IsSet, load_file(), LSNGetDatum(), MyDatabaseId, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), OBJECT_DATABASE, OBJECT_FOREIGN_SERVER, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, opts, parse_subscription_options(), PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_create_subscription(), PreventInTransactionBlock(), publicationListToArray(), RangeVarGetRelid, recordDependencyOn(), recordDependencyOnOwner(), RelationGetDescr, RangeVar::relname, ReplicationOriginNameForLogicalRep(), replorigin_create(), RowExclusiveLock, RangeVar::schemaname, ForeignServer::serverid, ForeignServer::servername, stmt, SUBOPT_BINARY, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, superuser(), superuser_arg(), table_close(), table_open(), UpdateTwoPhaseState(), values, walrcv_check_conninfo, walrcv_connect, walrcv_create_slot, walrcv_disconnect, WARNING, and wrconn.

Referenced by ProcessUtilitySlow().

◆ defGetStreamingMode()

char defGetStreamingMode ( DefElem def)
extern

Definition at line 3379 of file subscriptioncmds.c.

3380{
3381 /*
3382 * If no parameter value given, assume "true" is meant.
3383 */
3384 if (!def->arg)
3385 return LOGICALREP_STREAM_ON;
3386
3387 /*
3388 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3389 */
3390 switch (nodeTag(def->arg))
3391 {
3392 case T_Integer:
3393 switch (intVal(def->arg))
3394 {
3395 case 0:
3396 return LOGICALREP_STREAM_OFF;
3397 case 1:
3398 return LOGICALREP_STREAM_ON;
3399 default:
3400 /* otherwise, error out below */
3401 break;
3402 }
3403 break;
3404 default:
3405 {
3406 char *sval = defGetString(def);
3407
3408 /*
3409 * The set of strings accepted here should match up with the
3410 * grammar's opt_boolean_or_string production.
3411 */
3412 if (pg_strcasecmp(sval, "false") == 0 ||
3413 pg_strcasecmp(sval, "off") == 0)
3414 return LOGICALREP_STREAM_OFF;
3415 if (pg_strcasecmp(sval, "true") == 0 ||
3416 pg_strcasecmp(sval, "on") == 0)
3417 return LOGICALREP_STREAM_ON;
3418 if (pg_strcasecmp(sval, "parallel") == 0)
3420 }
3421 break;
3422 }
3423
3424 ereport(ERROR,
3426 errmsg("%s requires a Boolean value or \"parallel\"",
3427 def->defname)));
3428 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3429}
char * defGetString(DefElem *def)
Definition define.c:34
#define nodeTag(nodeptr)
Definition nodes.h:139
char * defname
Definition parsenodes.h:860
Node * arg
Definition parsenodes.h:861
#define intVal(v)
Definition value.h:79

References DefElem::arg, defGetString(), DefElem::defname, ereport, errcode(), errmsg, ERROR, fb(), intVal, nodeTag, and pg_strcasecmp().

Referenced by parse_output_parameters(), and parse_subscription_options().

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)
extern

Definition at line 2185 of file subscriptioncmds.c.

2186{
2187 Relation rel;
2189 HeapTuple tup;
2190 Oid subid;
2191 Oid subowner;
2192 Datum datum;
2193 bool isnull;
2194 char *subname;
2195 char *conninfo;
2196 char *slotname;
2198 ListCell *lc;
2199 char originname[NAMEDATALEN];
2200 char *err = NULL;
2203 List *rstates;
2204 bool must_use_password;
2205
2206 /*
2207 * The launcher may concurrently start a new worker for this subscription.
2208 * During initialization, the worker checks for subscription validity and
2209 * exits if the subscription has already been dropped. See
2210 * InitializeLogRepWorker.
2211 */
2213
2215 CStringGetDatum(stmt->subname));
2216
2217 if (!HeapTupleIsValid(tup))
2218 {
2219 table_close(rel, NoLock);
2220
2221 if (!stmt->missing_ok)
2222 ereport(ERROR,
2224 errmsg("subscription \"%s\" does not exist",
2225 stmt->subname)));
2226 else
2228 (errmsg("subscription \"%s\" does not exist, skipping",
2229 stmt->subname)));
2230
2231 return;
2232 }
2233
2235 subid = form->oid;
2236 subowner = form->subowner;
2237 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2238
2239 /* must be owner */
2242 stmt->subname);
2243
2244 /* DROP hook for the subscription being removed */
2246
2247 /*
2248 * Lock the subscription so nobody else can do anything with it (including
2249 * the replication workers).
2250 */
2252
2253 /* Get subname */
2256 subname = pstrdup(NameStr(*DatumGetName(datum)));
2257
2258 /* Get conninfo */
2259 if (OidIsValid(form->subserver))
2260 {
2262 ForeignServer *server;
2263
2264 server = GetForeignServer(form->subserver);
2266 form->subowner, ACL_USAGE);
2267 if (aclresult != ACLCHECK_OK)
2268 {
2269 /*
2270 * Unable to generate connection string because permissions on the
2271 * foreign server have been removed. Follow the same logic as an
2272 * unusable subconninfo (which will result in an ERROR later
2273 * unless slot_name = NONE).
2274 */
2275 err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2276 GetUserNameFromId(form->subowner, false),
2277 server->servername);
2278 conninfo = NULL;
2279 }
2280 else
2281 conninfo = ForeignServerConnectionString(form->subowner,
2282 server);
2283 }
2284 else
2285 {
2288 conninfo = TextDatumGetCString(datum);
2289 }
2290
2291 /* Get slotname */
2294 if (!isnull)
2295 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2296 else
2297 slotname = NULL;
2298
2299 /*
2300 * Since dropping a replication slot is not transactional, the replication
2301 * slot stays dropped even if the transaction rolls back. So we cannot
2302 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2303 * replication slot. Also, in this case, we report a message for dropping
2304 * the subscription to the cumulative stats system.
2305 *
2306 * XXX The command name should really be something like "DROP SUBSCRIPTION
2307 * of a subscription that is associated with a replication slot", but we
2308 * don't have the proper facilities for that.
2309 */
2310 if (slotname)
2311 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2312
2315
2316 /* Remove the tuple from catalog. */
2317 CatalogTupleDelete(rel, &tup->t_self);
2318
2320
2321 /*
2322 * Stop all the subscription workers immediately.
2323 *
2324 * This is necessary if we are dropping the replication slot, so that the
2325 * slot becomes accessible.
2326 *
2327 * It is also necessary if the subscription is disabled and was disabled
2328 * in the same transaction. Then the workers haven't seen the disabling
2329 * yet and will still be running, leading to hangs later when we want to
2330 * drop the replication origin. If the subscription was disabled before
2331 * this transaction, then there shouldn't be any workers left, so this
2332 * won't make a difference.
2333 *
2334 * New workers won't be started because we hold an exclusive lock on the
2335 * subscription till the end of the transaction.
2336 */
2337 subworkers = logicalrep_workers_find(subid, false, true);
2338 foreach(lc, subworkers)
2339 {
2341
2343 }
2345
2346 /*
2347 * Remove the no-longer-useful entry in the launcher's table of apply
2348 * worker start times.
2349 *
2350 * If this transaction rolls back, the launcher might restart a failed
2351 * apply worker before wal_retrieve_retry_interval milliseconds have
2352 * elapsed, but that's pretty harmless.
2353 */
2355
2356 /*
2357 * Cleanup of tablesync replication origins.
2358 *
2359 * Any READY-state relations would already have dealt with clean-ups.
2360 *
2361 * Note that the state can't change because we have already stopped both
2362 * the apply and tablesync workers and they can't restart because of
2363 * exclusive lock on the subscription.
2364 */
2365 rstates = GetSubscriptionRelations(subid, true, false, true);
2366 foreach(lc, rstates)
2367 {
2369 Oid relid = rstate->relid;
2370
2371 /* Only cleanup resources of tablesync workers */
2372 if (!OidIsValid(relid))
2373 continue;
2374
2375 /*
2376 * Drop the tablesync's origin tracking if exists.
2377 *
2378 * It is possible that the origin is not yet created for tablesync
2379 * worker so passing missing_ok = true. This can happen for the states
2380 * before SUBREL_STATE_DATASYNC.
2381 */
2383 sizeof(originname));
2384 replorigin_drop_by_name(originname, true, false);
2385 }
2386
2387 /* Clean up dependencies */
2390
2391 /* Remove any associated relation synchronization states. */
2393
2394 /* Remove the origin tracking if exists. */
2396 replorigin_drop_by_name(originname, true, false);
2397
2398 /*
2399 * Tell the cumulative stats system that the subscription is getting
2400 * dropped.
2401 */
2403
2404 /*
2405 * If there is no slot associated with the subscription, we can finish
2406 * here.
2407 */
2408 if (!slotname && rstates == NIL)
2409 {
2410 table_close(rel, NoLock);
2411 return;
2412 }
2413
2414 /*
2415 * Try to acquire the connection necessary for dropping slots.
2416 *
2417 * Note: If the slotname is NONE/NULL then we allow the command to finish
2418 * and users need to manually cleanup the apply and tablesync worker slots
2419 * later.
2420 *
2421 * This has to be at the end because otherwise if there is an error while
2422 * doing the database operations we won't be able to rollback dropped
2423 * slot.
2424 */
2425 load_file("libpqwalreceiver", false);
2426
2427 if (conninfo)
2428 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2429 subname, &err);
2430
2431 if (wrconn == NULL)
2432 {
2433 if (!slotname)
2434 {
2435 /* be tidy */
2437 table_close(rel, NoLock);
2438 return;
2439 }
2440 else
2441 {
2442 ReportSlotConnectionError(rstates, subid, slotname, err);
2443 }
2444 }
2445
2446 PG_TRY();
2447 {
2448 foreach(lc, rstates)
2449 {
2451 Oid relid = rstate->relid;
2452
2453 /* Only cleanup resources of tablesync workers */
2454 if (!OidIsValid(relid))
2455 continue;
2456
2457 /*
2458 * Drop the tablesync slots associated with removed tables.
2459 *
2460 * For SYNCDONE/READY states, the tablesync slot is known to have
2461 * already been dropped by the tablesync worker.
2462 *
2463 * For other states, there is no certainty, maybe the slot does
2464 * not exist yet. Also, if we fail after removing some of the
2465 * slots, next time, it will again try to drop already dropped
2466 * slots and fail. For these reasons, we allow missing_ok = true
2467 * for the drop.
2468 */
2469 if (rstate->state != SUBREL_STATE_SYNCDONE)
2470 {
2471 char syncslotname[NAMEDATALEN] = {0};
2472
2474 sizeof(syncslotname));
2476 }
2477 }
2478
2480
2481 /*
2482 * If there is a slot associated with the subscription, then drop the
2483 * replication slot at the publisher.
2484 */
2485 if (slotname)
2486 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2487 }
2488 PG_FINALLY();
2489 {
2491 }
2492 PG_END_TRY();
2493
2494 table_close(rel, NoLock);
2495}
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:835
#define _(x)
Definition elog.c:96
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:662
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
void list_free(List *list)
Definition list.c:1546
#define NoLock
Definition lockdefs.h:34
char * pstrdup(const char *in)
Definition mcxt.c:1910
#define InvokeObjectDropHook(classId, objectId, subId)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
Definition pg_depend.c:303
#define lfirst(lc)
Definition pg_list.h:172
#define NIL
Definition pg_list.h:68
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
NameData subname
void pgstat_drop_subscription(Oid subid)
static Name DatumGetName(Datum X)
Definition postgres.h:393
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
LogicalRepWorkerType type
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:231
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:626
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1236

References _, AccessExclusiveLock, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ApplyLauncherForgetWorkerStartTime(), CatalogTupleDelete(), CStringGetDatum(), DatumGetName(), deleteDependencyRecordsFor(), deleteSharedDependencyRecordsFor(), ereport, err(), errcode(), errmsg, ERROR, EventTriggerSQLDropAddObject(), fb(), ForeignServerConnectionString(), Form_pg_subscription, GetForeignServer(), GETSTRUCT(), GetSubscriptionRelations(), GetUserId(), GetUserNameFromId(), HeapTupleIsValid, InvalidOid, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), MyDatabaseId, NAMEDATALEN, NameStr, NIL, NoLock, NOTICE, object_aclcheck(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_drop_subscription(), PreventInTransactionBlock(), psprintf(), pstrdup(), ReleaseSysCache(), SubscriptionRelState::relid, LogicalRepWorker::relid, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), ReportSlotConnectionError(), RowExclusiveLock, SearchSysCache2(), ForeignServer::servername, SubscriptionRelState::state, stmt, LogicalRepWorker::subid, subname, superuser_arg(), SysCacheGetAttr(), SysCacheGetAttrNotNull(), table_close(), table_open(), TextDatumGetCString, LogicalRepWorker::type, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().