PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Data Structures

struct  SubOpts
 
struct  PublicationRelKind
 

Macros

#define SUBOPT_CONNECT   0x00000001
 
#define SUBOPT_ENABLED   0x00000002
 
#define SUBOPT_CREATE_SLOT   0x00000004
 
#define SUBOPT_SLOT_NAME   0x00000008
 
#define SUBOPT_COPY_DATA   0x00000010
 
#define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
 
#define SUBOPT_REFRESH   0x00000040
 
#define SUBOPT_BINARY   0x00000080
 
#define SUBOPT_STREAMING   0x00000100
 
#define SUBOPT_TWOPHASE_COMMIT   0x00000200
 
#define SUBOPT_DISABLE_ON_ERR   0x00000400
 
#define SUBOPT_PASSWORD_REQUIRED   0x00000800
 
#define SUBOPT_RUN_AS_OWNER   0x00001000
 
#define SUBOPT_FAILOVER   0x00002000
 
#define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
 
#define SUBOPT_MAX_RETENTION_DURATION   0x00008000
 
#define SUBOPT_WAL_RECEIVER_TIMEOUT   0x00010000
 
#define SUBOPT_LSN   0x00020000
 
#define SUBOPT_ORIGIN   0x00040000
 
#define IsSet(val, bits)   (((val) & (bits)) == (bits))
 
#define appendQuotedIdentifier(b, s)   appendQuotedString(b, s, '"')
 
#define appendQuotedLiteral(b, s)   appendQuotedString(b, s, '\'')
 

Typedefs

typedef struct SubOpts SubOpts
 
typedef struct PublicationRelKind PublicationRelKind
 

Functions

static Listfetch_relation_list (WalReceiverConn *wrconn, List *publications)
 
static void check_publications_origin_tables (WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
 
static void check_publications_origin_sequences (WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
 
static void check_pub_dead_tuple_retention (WalReceiverConn *wrconn)
 
static void check_duplicates_in_publist (List *publist, Datum *datums)
 
static Listmerge_publications (List *oldpublist, List *newpublist, bool addpub, const char *subname)
 
static void ReportSlotConnectionError (List *rstates, Oid subid, char *slotname, char *err)
 
static void CheckAlterSubOption (Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
 
static void parse_subscription_options (ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
 
static void appendQuotedString (StringInfo buf, const char *str, char quote)
 
static void check_publications (WalReceiverConn *wrconn, List *publications)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data, List *validate_publications)
 
static void AlterSubscription_refresh_seq (Subscription *sub)
 
ObjectAddress AlterSubscription (ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
void ReplicationSlotDropAtPubNode (WalReceiverConn *wrconn, char *slotname, bool missing_ok)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 
void CheckSubDeadTupleRetention (bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
 
static bool list_member_rangevar (const List *list, RangeVar *rv)
 
char defGetStreamingMode (DefElem *def)
 

Macro Definition Documentation

◆ appendQuotedIdentifier

#define appendQuotedIdentifier (   b,
 
)    appendQuotedString(b, s, '"')

Definition at line 544 of file subscriptioncmds.c.

◆ appendQuotedLiteral

#define appendQuotedLiteral (   b,
 
)    appendQuotedString(b, s, '\'')

Definition at line 545 of file subscriptioncmds.c.

◆ IsSet

#define IsSet (   val,
  bits 
)    (((val) & (bits)) == (bits))

Definition at line 84 of file subscriptioncmds.c.

◆ SUBOPT_BINARY

#define SUBOPT_BINARY   0x00000080

Definition at line 70 of file subscriptioncmds.c.

◆ SUBOPT_CONNECT

#define SUBOPT_CONNECT   0x00000001

Definition at line 63 of file subscriptioncmds.c.

◆ SUBOPT_COPY_DATA

#define SUBOPT_COPY_DATA   0x00000010

Definition at line 67 of file subscriptioncmds.c.

◆ SUBOPT_CREATE_SLOT

#define SUBOPT_CREATE_SLOT   0x00000004

Definition at line 65 of file subscriptioncmds.c.

◆ SUBOPT_DISABLE_ON_ERR

#define SUBOPT_DISABLE_ON_ERR   0x00000400

Definition at line 73 of file subscriptioncmds.c.

◆ SUBOPT_ENABLED

#define SUBOPT_ENABLED   0x00000002

Definition at line 64 of file subscriptioncmds.c.

◆ SUBOPT_FAILOVER

#define SUBOPT_FAILOVER   0x00002000

Definition at line 76 of file subscriptioncmds.c.

◆ SUBOPT_LSN

#define SUBOPT_LSN   0x00020000

Definition at line 80 of file subscriptioncmds.c.

◆ SUBOPT_MAX_RETENTION_DURATION

#define SUBOPT_MAX_RETENTION_DURATION   0x00008000

Definition at line 78 of file subscriptioncmds.c.

◆ SUBOPT_ORIGIN

#define SUBOPT_ORIGIN   0x00040000

Definition at line 81 of file subscriptioncmds.c.

◆ SUBOPT_PASSWORD_REQUIRED

#define SUBOPT_PASSWORD_REQUIRED   0x00000800

Definition at line 74 of file subscriptioncmds.c.

◆ SUBOPT_REFRESH

#define SUBOPT_REFRESH   0x00000040

Definition at line 69 of file subscriptioncmds.c.

◆ SUBOPT_RETAIN_DEAD_TUPLES

#define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000

Definition at line 77 of file subscriptioncmds.c.

◆ SUBOPT_RUN_AS_OWNER

#define SUBOPT_RUN_AS_OWNER   0x00001000

Definition at line 75 of file subscriptioncmds.c.

◆ SUBOPT_SLOT_NAME

#define SUBOPT_SLOT_NAME   0x00000008

Definition at line 66 of file subscriptioncmds.c.

◆ SUBOPT_STREAMING

#define SUBOPT_STREAMING   0x00000100

Definition at line 71 of file subscriptioncmds.c.

◆ SUBOPT_SYNCHRONOUS_COMMIT

#define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020

Definition at line 68 of file subscriptioncmds.c.

◆ SUBOPT_TWOPHASE_COMMIT

#define SUBOPT_TWOPHASE_COMMIT   0x00000200

Definition at line 72 of file subscriptioncmds.c.

◆ SUBOPT_WAL_RECEIVER_TIMEOUT

#define SUBOPT_WAL_RECEIVER_TIMEOUT   0x00010000

Definition at line 79 of file subscriptioncmds.c.

Typedef Documentation

◆ PublicationRelKind

◆ SubOpts

Function Documentation

◆ AlterSubscription()

ObjectAddress AlterSubscription ( ParseState pstate,
AlterSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 1444 of file subscriptioncmds.c.

1446{
1447 Relation rel;
1449 bool nulls[Natts_pg_subscription];
1452 HeapTuple tup;
1453 Oid subid;
1454 bool update_tuple = false;
1455 bool update_failover = false;
1456 bool update_two_phase = false;
1457 bool check_pub_rdt = false;
1458 bool retain_dead_tuples;
1459 int max_retention;
1460 bool retention_active;
1461 char *new_conninfo = NULL;
1462 char *origin;
1463 Subscription *sub;
1466 SubOpts opts = {0};
1467
1469
1470 /* Fetch the existing tuple. */
1472 CStringGetDatum(stmt->subname));
1473
1474 if (!HeapTupleIsValid(tup))
1475 ereport(ERROR,
1477 errmsg("subscription \"%s\" does not exist",
1478 stmt->subname)));
1479
1481 subid = form->oid;
1482
1483 /* must be owner */
1486 stmt->subname);
1487
1488 /*
1489 * Skip ACL checks on the subscription's foreign server, if any. If
1490 * changing the server (or replacing it with a raw connection), then the
1491 * old one will be removed anyway. If changing something unrelated,
1492 * there's no need to do an additional ACL check here; that will be done
1493 * by the subscription worker anyway.
1494 */
1495 sub = GetSubscription(subid, false, false);
1496
1498 origin = sub->origin;
1501
1502 /*
1503 * Don't allow non-superuser modification of a subscription with
1504 * password_required=false.
1505 */
1506 if (!sub->passwordrequired && !superuser())
1507 ereport(ERROR,
1509 errmsg("password_required=false is superuser-only"),
1510 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1511
1512 /* Lock the subscription so nobody else can do anything with it. */
1514
1515 /* Form a new tuple. */
1516 memset(values, 0, sizeof(values));
1517 memset(nulls, false, sizeof(nulls));
1518 memset(replaces, false, sizeof(replaces));
1519
1521
1522 switch (stmt->kind)
1523 {
1525 {
1536
1537 parse_subscription_options(pstate, stmt->options,
1539
1540 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1541 {
1542 /*
1543 * The subscription must be disabled to allow slot_name as
1544 * 'none', otherwise, the apply worker will repeatedly try
1545 * to stream the data using that slot_name which neither
1546 * exists on the publisher nor the user will be allowed to
1547 * create it.
1548 */
1549 if (sub->enabled && !opts.slot_name)
1550 ereport(ERROR,
1552 errmsg("cannot set %s for enabled subscription",
1553 "slot_name = NONE")));
1554
1555 if (opts.slot_name)
1558 else
1559 nulls[Anum_pg_subscription_subslotname - 1] = true;
1561 }
1562
1563 if (opts.synchronous_commit)
1564 {
1566 CStringGetTextDatum(opts.synchronous_commit);
1568 }
1569
1570 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1571 {
1573 BoolGetDatum(opts.binary);
1575 }
1576
1577 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1578 {
1580 CharGetDatum(opts.streaming);
1582 }
1583
1584 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1585 {
1587 = BoolGetDatum(opts.disableonerr);
1589 = true;
1590 }
1591
1592 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1593 {
1594 /* Non-superuser may not disable password_required. */
1595 if (!opts.passwordrequired && !superuser())
1596 ereport(ERROR,
1598 errmsg("password_required=false is superuser-only"),
1599 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1600
1602 = BoolGetDatum(opts.passwordrequired);
1604 = true;
1605 }
1606
1607 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1608 {
1610 BoolGetDatum(opts.runasowner);
1612 }
1613
1614 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1615 {
1616 /*
1617 * We need to update both the slot and the subscription
1618 * for the two_phase option. We can enable the two_phase
1619 * option for a slot only once the initial data
1620 * synchronization is done. This is to avoid missing some
1621 * data as explained in comments atop worker.c.
1622 */
1623 update_two_phase = !opts.twophase;
1624
1625 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1626 isTopLevel);
1627
1628 /*
1629 * Modifying the two_phase slot option requires a slot
1630 * lookup by slot name, so changing the slot name at the
1631 * same time is not allowed.
1632 */
1633 if (update_two_phase &&
1634 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1635 ereport(ERROR,
1637 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1638
1639 /*
1640 * Note that workers may still survive even if the
1641 * subscription has been disabled.
1642 *
1643 * Ensure workers have already been exited to avoid
1644 * getting prepared transactions while we are disabling
1645 * the two_phase option. Otherwise, the changes of an
1646 * already prepared transaction can be replicated again
1647 * along with its corresponding commit, leading to
1648 * duplicate data or errors.
1649 */
1650 if (logicalrep_workers_find(subid, true, true))
1651 ereport(ERROR,
1653 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1654 errhint("Try again after some time.")));
1655
1656 /*
1657 * two_phase cannot be disabled if there are any
1658 * uncommitted prepared transactions present otherwise it
1659 * can lead to duplicate data or errors as explained in
1660 * the comment above.
1661 */
1662 if (update_two_phase &&
1664 LookupGXactBySubid(subid))
1665 ereport(ERROR,
1667 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1668 errhint("Resolve these transactions and try again.")));
1669
1670 /* Change system catalog accordingly */
1672 CharGetDatum(opts.twophase ?
1676 }
1677
1678 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1679 {
1680 /*
1681 * Similar to the two_phase case above, we need to update
1682 * the failover option for both the slot and the
1683 * subscription.
1684 */
1685 update_failover = true;
1686
1687 CheckAlterSubOption(sub, "failover", update_failover,
1688 isTopLevel);
1689
1691 BoolGetDatum(opts.failover);
1693 }
1694
1695 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1696 {
1698 BoolGetDatum(opts.retaindeadtuples);
1700
1701 /*
1702 * Update the retention status only if there's a change in
1703 * the retain_dead_tuples option value.
1704 *
1705 * Automatically marking retention as active when
1706 * retain_dead_tuples is enabled may not always be ideal,
1707 * especially if retention was previously stopped and the
1708 * user toggles retain_dead_tuples without adjusting the
1709 * publisher workload. However, this behavior provides a
1710 * convenient way for users to manually refresh the
1711 * retention status. Since retention will be stopped again
1712 * unless the publisher workload is reduced, this approach
1713 * is acceptable for now.
1714 */
1715 if (opts.retaindeadtuples != sub->retaindeadtuples)
1716 {
1718 BoolGetDatum(opts.retaindeadtuples);
1720
1721 retention_active = opts.retaindeadtuples;
1722 }
1723
1724 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1725
1726 /*
1727 * Workers may continue running even after the
1728 * subscription has been disabled.
1729 *
1730 * To prevent race conditions (as described in
1731 * CheckAlterSubOption()), ensure that all worker
1732 * processes have already exited before proceeding.
1733 */
1734 if (logicalrep_workers_find(subid, true, true))
1735 ereport(ERROR,
1737 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1738 errhint("Try again after some time.")));
1739
1740 /*
1741 * Notify the launcher to manage the replication slot for
1742 * conflict detection. This ensures that replication slot
1743 * is efficiently handled (created, updated, or dropped)
1744 * in response to any configuration changes.
1745 */
1747
1748 check_pub_rdt = opts.retaindeadtuples;
1749 retain_dead_tuples = opts.retaindeadtuples;
1750 }
1751
1752 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1753 {
1755 Int32GetDatum(opts.maxretention);
1757
1758 max_retention = opts.maxretention;
1759 }
1760
1761 /*
1762 * Ensure that system configuration parameters are set
1763 * appropriately to support retain_dead_tuples and
1764 * max_retention_duration.
1765 */
1766 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1767 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1771 (max_retention > 0));
1772
1773 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1774 {
1776 CStringGetTextDatum(opts.origin);
1778
1779 /*
1780 * Check if changes from different origins may be received
1781 * from the publisher when the origin is changed to ANY
1782 * and retain_dead_tuples is enabled. Use |= so that we
1783 * don't clear the flag already set when
1784 * retain_dead_tuples was changed in the same command.
1785 */
1788
1789 origin = opts.origin;
1790 }
1791
1792 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1793 {
1795 CStringGetTextDatum(opts.wal_receiver_timeout);
1797 }
1798
1799 update_tuple = true;
1800 break;
1801 }
1802
1804 {
1805 parse_subscription_options(pstate, stmt->options,
1807 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1808
1809 if (!sub->slotname && opts.enabled)
1810 ereport(ERROR,
1812 errmsg("cannot enable subscription that does not have a slot name")));
1813
1814 /*
1815 * Check track_commit_timestamp only when enabling the
1816 * subscription in case it was disabled after creation. See
1817 * comments atop CheckSubDeadTupleRetention() for details.
1818 */
1819 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1821 sub->retentionactive, false);
1822
1824 BoolGetDatum(opts.enabled);
1826
1827 if (opts.enabled)
1829
1830 update_tuple = true;
1831
1832 /*
1833 * The subscription might be initially created with
1834 * connect=false and retain_dead_tuples=true, meaning the
1835 * remote server's status may not be checked. Ensure this
1836 * check is conducted now.
1837 */
1838 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1839 break;
1840 }
1841
1843 {
1847
1848 /*
1849 * Remove what was there before, either another foreign server
1850 * or a connection string.
1851 */
1852 if (form->subserver)
1853 {
1856 ForeignServerRelationId, form->subserver);
1857 }
1858 else
1859 {
1860 nulls[Anum_pg_subscription_subconninfo - 1] = true;
1862 }
1863
1864 /*
1865 * Check that the subscription owner has USAGE privileges on
1866 * the server.
1867 */
1868 new_server = GetForeignServerByName(stmt->servername, false);
1870 new_server->serverid,
1871 form->subowner, ACL_USAGE);
1872 if (aclresult != ACLCHECK_OK)
1873 ereport(ERROR,
1875 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1876 GetUserNameFromId(form->subowner, false),
1877 new_server->servername));
1878
1879 /* make sure a user mapping exists */
1880 GetUserMapping(form->subowner, new_server->serverid);
1881
1883 new_server);
1884
1885 /* Load the library providing us libpq calls. */
1886 load_file("libpqwalreceiver", false);
1887 /* Check the connection info string. */
1889 sub->passwordrequired && !sub->ownersuperuser);
1890
1893
1896
1897 update_tuple = true;
1898 }
1899
1900 /*
1901 * Since the remote server configuration might have changed,
1902 * perform a check to ensure it permits enabling
1903 * retain_dead_tuples.
1904 */
1906 break;
1907
1909 /* remove reference to foreign server and dependencies, if present */
1910 if (form->subserver)
1911 {
1914 ForeignServerRelationId, form->subserver);
1915
1918 }
1919
1920 new_conninfo = stmt->conninfo;
1921
1922 /* Load the library providing us libpq calls. */
1923 load_file("libpqwalreceiver", false);
1924 /* Check the connection info string. */
1926 sub->passwordrequired && !sub->ownersuperuser);
1927
1929 CStringGetTextDatum(stmt->conninfo);
1931 update_tuple = true;
1932
1933 /*
1934 * Since the remote server configuration might have changed,
1935 * perform a check to ensure it permits enabling
1936 * retain_dead_tuples.
1937 */
1939 break;
1940
1942 {
1944 parse_subscription_options(pstate, stmt->options,
1946
1948 publicationListToArray(stmt->publication);
1950
1951 update_tuple = true;
1952
1953 /* Refresh if user asked us to. */
1954 if (opts.refresh)
1955 {
1956 if (!sub->enabled)
1957 ereport(ERROR,
1959 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1960 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1961
1962 /*
1963 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1964 * why this is not allowed.
1965 */
1966 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1967 ereport(ERROR,
1969 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1970 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1971
1972 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1973
1974 /* Make sure refresh sees the new list of publications. */
1975 sub->publications = stmt->publication;
1976
1977 AlterSubscription_refresh(sub, opts.copy_data,
1978 stmt->publication);
1979 }
1980
1981 break;
1982 }
1983
1986 {
1987 List *publist;
1989
1991 parse_subscription_options(pstate, stmt->options,
1993
1994 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1998
1999 update_tuple = true;
2000
2001 /* Refresh if user asked us to. */
2002 if (opts.refresh)
2003 {
2004 /* We only need to validate user specified publications. */
2005 List *validate_publications = (isadd) ? stmt->publication : NULL;
2006
2007 if (!sub->enabled)
2008 ereport(ERROR,
2010 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2011 /* translator: %s is an SQL ALTER command */
2012 errhint("Use %s instead.",
2013 isadd ?
2014 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
2015 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
2016
2017 /*
2018 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
2019 * why this is not allowed.
2020 */
2021 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2022 ereport(ERROR,
2024 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2025 /* translator: %s is an SQL ALTER command */
2026 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
2027 isadd ?
2028 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
2029 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
2030
2031 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2032
2033 /* Refresh the new list of publications. */
2034 sub->publications = publist;
2035
2036 AlterSubscription_refresh(sub, opts.copy_data,
2038 }
2039
2040 break;
2041 }
2042
2044 {
2045 if (!sub->enabled)
2046 ereport(ERROR,
2048 errmsg("%s is not allowed for disabled subscriptions",
2049 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2050
2051 parse_subscription_options(pstate, stmt->options,
2053
2054 /*
2055 * The subscription option "two_phase" requires that
2056 * replication has passed the initial table synchronization
2057 * phase before the two_phase becomes properly enabled.
2058 *
2059 * But, having reached this two-phase commit "enabled" state
2060 * we must not allow any subsequent table initialization to
2061 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2062 * disallowed when the user had requested two_phase = on mode.
2063 *
2064 * The exception to this restriction is when copy_data =
2065 * false, because when copy_data is false the tablesync will
2066 * start already in READY state and will exit directly without
2067 * doing anything.
2068 *
2069 * For more details see comments atop worker.c.
2070 */
2071 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2072 ereport(ERROR,
2074 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2075 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2076
2077 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2078
2079 AlterSubscription_refresh(sub, opts.copy_data, NULL);
2080
2081 break;
2082 }
2083
2085 {
2086 if (!sub->enabled)
2087 ereport(ERROR,
2089 errmsg("%s is not allowed for disabled subscriptions",
2090 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2091
2093
2094 break;
2095 }
2096
2098 {
2099 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
2100
2101 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
2102 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2103
2104 /*
2105 * If the user sets subskiplsn, we do a sanity check to make
2106 * sure that the specified LSN is a probable value.
2107 */
2108 if (XLogRecPtrIsValid(opts.lsn))
2109 {
2111 char originname[NAMEDATALEN];
2112 XLogRecPtr remote_lsn;
2113
2115 originname, sizeof(originname));
2117 remote_lsn = replorigin_get_progress(originid, false);
2118
2119 /* Check the given LSN is at least a future LSN */
2120 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
2121 ereport(ERROR,
2123 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2124 LSN_FORMAT_ARGS(opts.lsn),
2125 LSN_FORMAT_ARGS(remote_lsn))));
2126 }
2127
2130
2131 update_tuple = true;
2132 break;
2133 }
2134
2135 default:
2136 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2137 stmt->kind);
2138 }
2139
2140 /* Update the catalog if needed. */
2141 if (update_tuple)
2142 {
2144 replaces);
2145
2146 CatalogTupleUpdate(rel, &tup->t_self, tup);
2147
2149 }
2150
2151 /*
2152 * Try to acquire the connection necessary either for modifying the slot
2153 * or for checking if the remote server permits enabling
2154 * retain_dead_tuples.
2155 *
2156 * This has to be at the end because otherwise if there is an error while
2157 * doing the database operations we won't be able to rollback altered
2158 * slot.
2159 */
2161 {
2162 bool must_use_password;
2163 char *err;
2165
2166 /* Load the library providing us libpq calls. */
2167 load_file("libpqwalreceiver", false);
2168
2169 /*
2170 * Try to connect to the publisher, using the new connection string if
2171 * available.
2172 */
2174 wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
2176 &err);
2177 if (!wrconn)
2178 ereport(ERROR,
2180 errmsg("subscription \"%s\" could not connect to the publisher: %s",
2181 sub->name, err)));
2182
2183 PG_TRY();
2184 {
2187
2189 retain_dead_tuples, origin, NULL, 0,
2190 sub->name);
2191
2194 update_failover ? &opts.failover : NULL,
2195 update_two_phase ? &opts.twophase : NULL);
2196 }
2197 PG_FINALLY();
2198 {
2200 }
2201 PG_END_TRY();
2202 }
2203
2205
2207
2208 /* Wake up related replication workers to handle this change quickly. */
2210
2211 return myself;
2212}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
@ ACLCHECK_NOT_OWNER
Definition acl.h:186
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3880
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4134
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6334
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:943
uint32_t uint32
Definition c.h:624
@ DEPENDENCY_NORMAL
Definition dependency.h:33
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode(int sqlerrcode)
Definition elog.c:875
int errhint(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
#define PG_END_TRY(...)
Definition elog.h:399
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define NOTICE
Definition elog.h:36
#define PG_FINALLY(...)
Definition elog.h:391
#define ereport(elevel,...)
Definition elog.h:152
void err(int eval, const char *fmt,...)
Definition err.c:43
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:688
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:185
char * ForeignServerConnectionString(Oid userid, ForeignServer *server)
Definition foreign.c:202
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:232
Oid MyDatabaseId
Definition globals.c:96
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
return true
Definition isn.c:130
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1185
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
static char * errmsg
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SERVER
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
#define ACL_USAGE
Definition parsenodes.h:84
@ OBJECT_SUBSCRIPTION
static AmcheckOptions opts
Definition pg_amcheck.c:112
#define NAMEDATALEN
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:51
long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, Oid refclassId, Oid refobjectId)
Definition pg_depend.c:411
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
int pg_strcasecmp(const char *s1, const char *s2)
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition postgres.h:383
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:542
Definition pg_list.h:54
#define SUBOPT_STREAMING
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
#define SUBOPT_DISABLE_ON_ERR
static void parse_subscription_options(ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
bool superuser(void)
Definition superuser.c:47
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2803
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:95
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AccessExclusiveLock, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH_SEQUENCES, ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SKIP, AlterSubscription_refresh(), AlterSubscription_refresh_seq(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleUpdate(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications_origin_tables(), CheckAlterSubOption(), CheckSubDeadTupleRetention(), Subscription::conninfo, CStringGetDatum(), CStringGetTextDatum, deleteDependencyRecordsForSpecific(), DEPENDENCY_NORMAL, DirectFunctionCall1, elog, Subscription::enabled, ereport, err(), errcode(), errhint(), errmsg, ERROR, fb(), ForeignServerConnectionString(), Form_pg_subscription, GetForeignServerByName(), GETSTRUCT(), GetSubscription(), GetUserId(), GetUserMapping(), GetUserNameFromId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, Int32GetDatum(), InvalidOid, InvokeObjectPostAlterHook, IsSet, load_file(), LockSharedObject(), logicalrep_workers_find(), LogicalRepWorkersWakeupAtCommit(), LookupGXactBySubid(), LSN_FORMAT_ARGS, LSNGetDatum(), Subscription::maxretention, merge_publications(), MyDatabaseId, Subscription::name, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), opts, Subscription::origin, Subscription::ownersuperuser, parse_subscription_options(), Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, pg_strcasecmp(), PG_TRY, PreventInTransactionBlock(), publicationListToArray(), Subscription::publications, recordDependencyOn(), RelationGetDescr, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_get_progress(), Subscription::retaindeadtuples, Subscription::retentionactive, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, stmt, SUBOPT_BINARY, SUBOPT_COPY_DATA, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, superuser(), table_close(), table_open(), Subscription::twophasestate, values, walrcv_alter_slot, walrcv_check_conninfo, walrcv_connect, walrcv_disconnect, WARNING, wrconn, and XLogRecPtrIsValid.

Referenced by ProcessUtilitySlow().

◆ AlterSubscription_refresh()

static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data,
List validate_publications 
)
static

Definition at line 1030 of file subscriptioncmds.c.

1032{
1033 char *err;
1034 List *pubrels = NIL;
1040 int subrel_count;
1041 ListCell *lc;
1042 int off;
1043 int tbl_count = 0;
1044 int seq_count = 0;
1045 Relation rel = NULL;
1046 typedef struct SubRemoveRels
1047 {
1048 Oid relid;
1049 char state;
1050 } SubRemoveRels;
1051
1053 bool must_use_password;
1054
1055 /* Load the library providing us libpq calls. */
1056 load_file("libpqwalreceiver", false);
1057
1058 /* Try to connect to the publisher. */
1060 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1061 sub->name, &err);
1062 if (!wrconn)
1063 ereport(ERROR,
1065 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1066 sub->name, err)));
1067
1068 PG_TRY();
1069 {
1072
1073 /* Get the relation list from publisher. */
1075
1076 /* Get local relation list. */
1077 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
1079
1080 /*
1081 * Build qsorted arrays of local table oids and sequence oids for
1082 * faster lookup. This can potentially contain all tables and
1083 * sequences in the database so speed of lookup is important.
1084 *
1085 * We do not yet know the exact count of tables and sequences, so we
1086 * allocate separate arrays for table OIDs and sequence OIDs based on
1087 * the total number of relations (subrel_count).
1088 */
1091 foreach(lc, subrel_states)
1092 {
1094
1095 if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
1096 subseq_local_oids[seq_count++] = relstate->relid;
1097 else
1098 subrel_local_oids[tbl_count++] = relstate->relid;
1099 }
1100
1103 sub->retaindeadtuples, sub->origin,
1105 sub->name);
1106
1109 copy_data, sub->origin,
1111 sub->name);
1112
1113 /*
1114 * Walk over the remote relations and try to match them to locally
1115 * known relations. If the relation is not known locally create a new
1116 * state for it.
1117 *
1118 * Also builds array of local oids of remote relations for the next
1119 * step.
1120 */
1121 off = 0;
1123
1125 {
1126 RangeVar *rv = pubrelinfo->rv;
1127 Oid relid;
1128 char relkind;
1129
1130 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1131 relkind = get_rel_relkind(relid);
1132
1133 /* Check for supported relkind. */
1134 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1135 rv->schemaname, rv->relname);
1136
1137 pubrel_local_oids[off++] = relid;
1138
1139 if (!bsearch(&relid, subrel_local_oids,
1140 tbl_count, sizeof(Oid), oid_cmp) &&
1141 !bsearch(&relid, subseq_local_oids,
1142 seq_count, sizeof(Oid), oid_cmp))
1143 {
1144 AddSubscriptionRelState(sub->oid, relid,
1148 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1149 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1150 rv->schemaname, rv->relname, sub->name));
1151 }
1152 }
1153
1154 /*
1155 * Next remove state for tables we should not care about anymore using
1156 * the data we collected above
1157 */
1159
1160 for (off = 0; off < tbl_count; off++)
1161 {
1162 Oid relid = subrel_local_oids[off];
1163
1164 if (!bsearch(&relid, pubrel_local_oids,
1165 list_length(pubrels), sizeof(Oid), oid_cmp))
1166 {
1167 char state;
1168 XLogRecPtr statelsn;
1170
1171 /*
1172 * Lock pg_subscription_rel with AccessExclusiveLock to
1173 * prevent any race conditions with the apply worker
1174 * re-launching workers at the same time this code is trying
1175 * to remove those tables.
1176 *
1177 * Even if new worker for this particular rel is restarted it
1178 * won't be able to make any progress as we hold exclusive
1179 * lock on pg_subscription_rel till the transaction end. It
1180 * will simply exit as there is no corresponding rel entry.
1181 *
1182 * This locking also ensures that the state of rels won't
1183 * change till we are done with this refresh operation.
1184 */
1185 if (!rel)
1187
1188 /* Last known rel state. */
1189 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1190
1191 RemoveSubscriptionRel(sub->oid, relid);
1192
1193 remove_rel->relid = relid;
1194 remove_rel->state = state;
1195
1197
1199
1200 /*
1201 * For READY state, we would have already dropped the
1202 * tablesync origin.
1203 */
1205 {
1206 char originname[NAMEDATALEN];
1207
1208 /*
1209 * Drop the tablesync's origin tracking if exists.
1210 *
1211 * It is possible that the origin is not yet created for
1212 * tablesync worker, this can happen for the states before
1213 * SUBREL_STATE_DATASYNC. The tablesync worker or apply
1214 * worker can also concurrently try to drop the origin and
1215 * by this time the origin might be already removed. For
1216 * these reasons, passing missing_ok = true.
1217 */
1219 sizeof(originname));
1220 replorigin_drop_by_name(originname, true, false);
1221 }
1222
1224 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1226 get_rel_name(relid),
1227 sub->name)));
1228 }
1229 }
1230
1231 /*
1232 * Drop the tablesync slots associated with removed tables. This has
1233 * to be at the end because otherwise if there is an error while doing
1234 * the database operations we won't be able to rollback dropped slots.
1235 */
1237 {
1238 if (sub_remove_rel->state != SUBREL_STATE_READY &&
1240 {
1241 char syncslotname[NAMEDATALEN] = {0};
1242
1243 /*
1244 * For READY/SYNCDONE states we know the tablesync slot has
1245 * already been dropped by the tablesync worker.
1246 *
1247 * For other states, there is no certainty, maybe the slot
1248 * does not exist yet. Also, if we fail after removing some of
1249 * the slots, next time, it will again try to drop already
1250 * dropped slots and fail. For these reasons, we allow
1251 * missing_ok = true for the drop.
1252 */
1254 syncslotname, sizeof(syncslotname));
1256 }
1257 }
1258
1259 /*
1260 * Next remove state for sequences we should not care about anymore
1261 * using the data we collected above
1262 */
1263 for (off = 0; off < seq_count; off++)
1264 {
1265 Oid relid = subseq_local_oids[off];
1266
1267 if (!bsearch(&relid, pubrel_local_oids,
1268 list_length(pubrels), sizeof(Oid), oid_cmp))
1269 {
1270 /*
1271 * This locking ensures that the state of rels won't change
1272 * till we are done with this refresh operation.
1273 */
1274 if (!rel)
1276
1277 RemoveSubscriptionRel(sub->oid, relid);
1278
1280 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1282 get_rel_name(relid),
1283 sub->name));
1284 }
1285 }
1286 }
1287 PG_FINALLY();
1288 {
1290 }
1291 PG_END_TRY();
1292
1293 if (rel)
1294 table_close(rel, NoLock);
1295}
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:31
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
#define palloc_object(type)
Definition fe_memutils.h:89
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:662
List * lappend(List *list, void *datum)
Definition list.c:339
#define NoLock
Definition lockdefs.h:34
#define AccessShareLock
Definition lockdefs.h:36
char * get_rel_name(Oid relid)
Definition lsyscache.c:2234
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2309
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2258
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3674
void * palloc(Size size)
Definition mcxt.c:1390
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
int oid_cmp(const void *p1, const void *p2)
Definition oid.c:287
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
NameData relname
Definition pg_class.h:40
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
#define qsort(a, b, c, d)
Definition port.h:496
char * relname
Definition primnodes.h:84
char * schemaname
Definition primnodes.h:81
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1236
@ WORKERTYPE_TABLESYNC
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References AccessExclusiveLock, AccessShareLock, AddSubscriptionRelState(), check_publications(), check_publications_origin_sequences(), check_publications_origin_tables(), CheckSubscriptionRelkind(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg, errmsg_internal(), ERROR, fb(), fetch_relation_list(), foreach_ptr, get_namespace_name(), get_rel_name(), get_rel_namespace(), get_rel_relkind(), GetSubscriptionRelations(), GetSubscriptionRelState(), InvalidXLogRecPtr, lappend(), lfirst, list_length(), load_file(), logicalrep_worker_stop(), Subscription::name, NAMEDATALEN, NIL, NoLock, Subscription::oid, oid_cmp(), Subscription::origin, Subscription::ownersuperuser, palloc(), palloc_object, Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, PG_TRY, Subscription::publications, qsort, RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), Subscription::retaindeadtuples, RangeVar::schemaname, table_close(), table_open(), walrcv_connect, walrcv_disconnect, WORKERTYPE_TABLESYNC, and wrconn.

Referenced by AlterSubscription().

◆ AlterSubscription_refresh_seq()

static void AlterSubscription_refresh_seq ( Subscription sub)
static

Definition at line 1301 of file subscriptioncmds.c.

1302{
1303 char *err = NULL;
1305 bool must_use_password;
1306
1307 /* Load the library providing us libpq calls. */
1308 load_file("libpqwalreceiver", false);
1309
1310 /* Try to connect to the publisher. */
1312 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1313 sub->name, &err);
1314 if (!wrconn)
1315 ereport(ERROR,
1317 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1318 sub->name, err));
1319
1320 PG_TRY();
1321 {
1323
1325 sub->origin, NULL, 0, sub->name);
1326
1327 /* Get local sequence list. */
1328 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1330 {
1331 Oid relid = subrel->relid;
1332
1334 InvalidXLogRecPtr, false);
1336 errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1338 get_rel_name(relid),
1339 sub->name));
1340 }
1341 }
1342 PG_FINALLY();
1343 {
1345 }
1346 PG_END_TRY();
1347}
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)

References check_publications_origin_sequences(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg, errmsg_internal(), ERROR, fb(), foreach_ptr, get_namespace_name(), get_rel_name(), get_rel_namespace(), GetSubscriptionRelations(), InvalidXLogRecPtr, load_file(), Subscription::name, Subscription::oid, Subscription::origin, Subscription::ownersuperuser, Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, PG_TRY, Subscription::publications, UpdateSubscriptionRelState(), walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char name,
Oid  newOwnerId 
)

Definition at line 2676 of file subscriptioncmds.c.

2677{
2678 Oid subid;
2679 HeapTuple tup;
2680 Relation rel;
2681 ObjectAddress address;
2683
2685
2688
2689 if (!HeapTupleIsValid(tup))
2690 ereport(ERROR,
2692 errmsg("subscription \"%s\" does not exist", name)));
2693
2695 subid = form->oid;
2696
2698
2700
2702
2704
2705 return address;
2706}
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)

References AlterSubscriptionOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg, ERROR, fb(), Form_pg_subscription, GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, MyDatabaseId, name, ObjectAddressSet, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy2, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterSubscriptionOwner_internal()

static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 2595 of file subscriptioncmds.c.

2596{
2599
2601
2602 if (form->subowner == newOwnerId)
2603 return;
2604
2607 NameStr(form->subname));
2608
2609 /*
2610 * Don't allow non-superuser modification of a subscription with
2611 * password_required=false.
2612 */
2613 if (!form->subpasswordrequired && !superuser())
2614 ereport(ERROR,
2616 errmsg("password_required=false is superuser-only"),
2617 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2618
2619 /* Must be able to become new owner */
2621
2622 /*
2623 * current owner must have CREATE on database
2624 *
2625 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2626 * other object types behave differently (e.g. you can't give a table to a
2627 * user who lacks CREATE privileges on a schema).
2628 */
2631 if (aclresult != ACLCHECK_OK)
2634
2635 /*
2636 * If the subscription uses a server, check that the new owner has USAGE
2637 * privileges on the server and that a user mapping exists. Note: does not
2638 * re-check the resulting connection string.
2639 */
2640 if (OidIsValid(form->subserver))
2641 {
2642 ForeignServer *server = GetForeignServer(form->subserver);
2643
2645 if (aclresult != ACLCHECK_OK)
2646 ereport(ERROR,
2648 errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2650 server->servername));
2651
2652 /* make sure a user mapping exists */
2654 }
2655
2656 form->subowner = newOwnerId;
2657 CatalogTupleUpdate(rel, &tup->t_self, tup);
2658
2659 /* Update owner dependency reference */
2661 form->oid,
2662 newOwnerId);
2663
2665 form->oid, 0);
2666
2667 /* Wake up related background processes to handle this change quickly. */
2670}
void check_can_set_role(Oid member, Oid role)
Definition acl.c:5371
#define NameStr(name)
Definition c.h:835
#define OidIsValid(objectId)
Definition c.h:858
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
char * get_database_name(Oid dbid)
Definition lsyscache.c:1384
@ OBJECT_DATABASE
#define ACL_CREATE
Definition parsenodes.h:85
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
char * servername
Definition foreign.h:40

References ACL_CREATE, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ApplyLauncherWakeupAtCommit(), CatalogTupleUpdate(), changeDependencyOnOwner(), check_can_set_role(), ereport, errcode(), errhint(), errmsg, ERROR, fb(), Form_pg_subscription, get_database_name(), GetForeignServer(), GETSTRUCT(), GetUserId(), GetUserMapping(), GetUserNameFromId(), InvokeObjectPostAlterHook, LogicalRepWorkersWakeupAtCommit(), MyDatabaseId, NameStr, object_aclcheck(), OBJECT_DATABASE, object_ownercheck(), OBJECT_SUBSCRIPTION, OidIsValid, ForeignServer::serverid, ForeignServer::servername, and superuser().

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 2712 of file subscriptioncmds.c.

2713{
2714 HeapTuple tup;
2715 Relation rel;
2716
2718
2720
2721 if (!HeapTupleIsValid(tup))
2722 ereport(ERROR,
2724 errmsg("subscription with OID %u does not exist", subid)));
2725
2727
2729
2731}
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg, ERROR, fb(), heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned_Owner().

◆ appendQuotedString()

static void appendQuotedString ( StringInfo  buf,
const char str,
char  quote 
)
static

Definition at line 530 of file subscriptioncmds.c.

531{
533 while (*str)
534 {
535 char c = *str++;
536
537 if (c == quote)
540 }
542}
const char * str
static char buf[DEFAULT_XLOG_SEG_SIZE]
char * c
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242

References appendStringInfoChar(), buf, and str.

◆ check_duplicates_in_publist()

static void check_duplicates_in_publist ( List publist,
Datum datums 
)
static

Definition at line 3315 of file subscriptioncmds.c.

3316{
3317 ListCell *cell;
3318 int j = 0;
3319
3320 foreach(cell, publist)
3321 {
3322 char *name = strVal(lfirst(cell));
3323 ListCell *pcell;
3324
3325 foreach(pcell, publist)
3326 {
3327 char *pname = strVal(lfirst(pcell));
3328
3329 if (pcell == cell)
3330 break;
3331
3332 if (strcmp(name, pname) == 0)
3333 ereport(ERROR,
3335 errmsg("publication name \"%s\" used more than once",
3336 pname)));
3337 }
3338
3339 if (datums)
3340 datums[j++] = CStringGetTextDatum(name);
3341 }
3342}
int j
Definition isn.c:78
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
#define strVal(v)
Definition value.h:82

References CStringGetTextDatum, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg, ERROR, fb(), j, lfirst, name, and strVal.

Referenced by merge_publications().

◆ check_pub_dead_tuple_retention()

static void check_pub_dead_tuple_retention ( WalReceiverConn wrconn)
static

Definition at line 3022 of file subscriptioncmds.c.

3023{
3024 WalRcvExecResult *res;
3025 Oid RecoveryRow[1] = {BOOLOID};
3026 TupleTableSlot *slot;
3027 bool isnull;
3028 bool remote_in_recovery;
3029
3030 if (walrcv_server_version(wrconn) < 190000)
3031 ereport(ERROR,
3033 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
3034
3035 res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
3036
3037 if (res->status != WALRCV_OK_TUPLES)
3038 ereport(ERROR,
3040 errmsg("could not obtain recovery progress from the publisher: %s",
3041 res->err)));
3042
3044 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3045 elog(ERROR, "failed to fetch tuple for the recovery progress");
3046
3047 remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
3048
3050 ereport(ERROR,
3052 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
3053
3055
3057}
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
static bool DatumGetBool(Datum X)
Definition postgres.h:100
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:417
@ WALRCV_OK_TUPLES
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)

References DatumGetBool(), elog, ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, ExecDropSingleTupleTableSlot(), fb(), MakeSingleTupleTableSlot(), slot_getattr(), WalRcvExecResult::status, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, and wrconn.

Referenced by AlterSubscription().

◆ check_publications()

static void check_publications ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 551 of file subscriptioncmds.c.

552{
553 WalRcvExecResult *res;
554 StringInfoData cmd;
555 TupleTableSlot *slot;
557 Oid tableRow[1] = {TEXTOID};
558
559 initStringInfo(&cmd);
560 appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
561 " pg_catalog.pg_publication t WHERE\n"
562 " t.pubname IN (");
563 GetPublicationsStr(publications, &cmd, true);
564 appendStringInfoChar(&cmd, ')');
565
566 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
567 pfree(cmd.data);
568
569 if (res->status != WALRCV_OK_TUPLES)
571 errmsg("could not receive list of publications from the publisher: %s",
572 res->err));
573
574 publicationsCopy = list_copy(publications);
575
576 /* Process publication(s). */
578 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
579 {
580 char *pubname;
581 bool isnull;
582
583 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
584 Assert(!isnull);
585
586 /* Delete the publication present in publisher from the list. */
588 ExecClearTuple(slot);
589 }
590
592
594
596 {
597 /* Prepare the list of non-existent publication(s) for error message. */
599
601
605 errmsg_plural("publication %s does not exist on the publisher",
606 "publications %s do not exist on the publisher",
608 pubnames.data));
609 }
610}
#define TextDatumGetCString(d)
Definition builtins.h:99
int int int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
List * list_delete(List *list, void *datum)
Definition list.c:853
List * list_copy(const List *oldlist)
Definition list.c:1573
void pfree(void *pointer)
Definition mcxt.c:1619
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
String * makeString(char *str)
Definition value.c:63

Referenced by AlterSubscription_refresh().

◆ check_publications_origin_sequences()

static void check_publications_origin_sequences ( WalReceiverConn wrconn,
List publications,
bool  copydata,
char origin,
Oid subrel_local_oids,
int  subrel_count,
char subname 
)
static

Definition at line 2905 of file subscriptioncmds.c.

2909{
2910 WalRcvExecResult *res;
2911 StringInfoData cmd;
2912 TupleTableSlot *slot;
2913 Oid tableRow[1] = {TEXTOID};
2914 List *publist = NIL;
2915
2916 /*
2917 * Enable sequence synchronization checks only when origin is 'none' , to
2918 * ensure that sequence data from other origins is not inadvertently
2919 * copied. This check is necessary if the publisher is running PG19 or
2920 * later, where logical replication sequence synchronization is supported.
2921 */
2922 if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
2923 walrcv_server_version(wrconn) < 190000)
2924 return;
2925
2926 initStringInfo(&cmd);
2928 "SELECT DISTINCT P.pubname AS pubname\n"
2929 "FROM pg_publication P,\n"
2930 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2931 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2932 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2933 "WHERE C.oid = GPS.relid AND P.pubname IN (");
2934
2935 GetPublicationsStr(publications, &cmd, true);
2936 appendStringInfoString(&cmd, ")\n");
2937
2938 /*
2939 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2940 * subrel_local_oids contains the list of relations that are already
2941 * present on the subscriber. This check should be skipped as these will
2942 * not be re-synced.
2943 */
2944 for (int i = 0; i < subrel_count; i++)
2945 {
2946 Oid relid = subrel_local_oids[i];
2947 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2948 char *seqname = get_rel_name(relid);
2949 char *schemaname_lit = quote_literal_cstr(schemaname);
2950 char *seqname_lit = quote_literal_cstr(seqname);
2951
2952 appendStringInfo(&cmd,
2953 "AND NOT (N.nspname = %s AND C.relname = %s)\n",
2955
2958 }
2959
2960 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2961 pfree(cmd.data);
2962
2963 if (res->status != WALRCV_OK_TUPLES)
2964 ereport(ERROR,
2966 errmsg("could not receive list of replicated sequences from the publisher: %s",
2967 res->err)));
2968
2969 /* Process publications. */
2971 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2972 {
2973 char *pubname;
2974 bool isnull;
2975
2976 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2977 Assert(!isnull);
2978
2979 ExecClearTuple(slot);
2981 }
2982
2983 /*
2984 * Log a warning if the publisher has subscribed to the same sequence from
2985 * some other publisher. We cannot know the origin of sequences data
2986 * during the initial sync.
2987 */
2988 if (publist)
2989 {
2991
2992 /* Prepare the list of publication(s) for warning message. */
2995
2998 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2999 subname),
3000 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
3001 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
3003 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
3004 }
3005
3007
3009}
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
int i
Definition isn.c:77
List * list_append_unique(List *list, void *datum)
Definition list.c:1343
NameData subname
char * quote_literal_cstr(const char *rawstr)
Definition quote.c:101
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145

References appendStringInfo(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errdetail_plural(), errhint(), errmsg, ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), fb(), get_namespace_name(), get_rel_name(), get_rel_namespace(), GetPublicationsStr(), i, initStringInfo(), list_append_unique(), list_length(), MakeSingleTupleTableSlot(), makeString(), NIL, pfree(), pg_strcasecmp(), quote_literal_cstr(), slot_getattr(), WalRcvExecResult::status, subname, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, WARNING, and wrconn.

Referenced by AlterSubscription_refresh(), and AlterSubscription_refresh_seq().

◆ check_publications_origin_tables()

static void check_publications_origin_tables ( WalReceiverConn wrconn,
List publications,
bool  copydata,
bool  retain_dead_tuples,
char origin,
Oid subrel_local_oids,
int  subrel_count,
char subname 
)
static

Definition at line 2757 of file subscriptioncmds.c.

2761{
2762 WalRcvExecResult *res;
2763 StringInfoData cmd;
2764 TupleTableSlot *slot;
2765 Oid tableRow[1] = {TEXTOID};
2766 List *publist = NIL;
2767 int i;
2768 bool check_rdt;
2769 bool check_table_sync;
2770 bool origin_none = origin &&
2772
2773 /*
2774 * Enable retain_dead_tuples checks only when origin is set to 'any',
2775 * since with origin='none' only local changes are replicated to the
2776 * subscriber.
2777 */
2779
2780 /*
2781 * Enable table synchronization checks only when origin is 'none', to
2782 * ensure that data from other origins is not inadvertently copied.
2783 */
2785
2786 /* retain_dead_tuples and table sync checks occur separately */
2788
2789 /* Return if no checks are required */
2790 if (!check_rdt && !check_table_sync)
2791 return;
2792
2793 initStringInfo(&cmd);
2795 "SELECT DISTINCT P.pubname AS pubname\n"
2796 "FROM pg_publication P,\n"
2797 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2798 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2799 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2800 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2801 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2802 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2803 GetPublicationsStr(publications, &cmd, true);
2804 appendStringInfoString(&cmd, ")\n");
2805
2806 /*
2807 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2808 * subrel_local_oids contains the list of relation oids that are already
2809 * present on the subscriber. This check should be skipped for these
2810 * tables if checking for table sync scenario. However, when handling the
2811 * retain_dead_tuples scenario, ensure all tables are checked, as some
2812 * existing tables may now include changes from other origins due to newly
2813 * created subscriptions on the publisher.
2814 */
2815 if (check_table_sync)
2816 {
2817 for (i = 0; i < subrel_count; i++)
2818 {
2819 Oid relid = subrel_local_oids[i];
2820 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2821 char *tablename = get_rel_name(relid);
2822 char *schemaname_lit = quote_literal_cstr(schemaname);
2823 char *tablename_lit = quote_literal_cstr(tablename);
2824
2825 appendStringInfo(&cmd, "AND NOT (N.nspname = %s AND C.relname = %s)\n",
2827
2830 }
2831 }
2832
2833 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2834 pfree(cmd.data);
2835
2836 if (res->status != WALRCV_OK_TUPLES)
2837 ereport(ERROR,
2839 errmsg("could not receive list of replicated tables from the publisher: %s",
2840 res->err)));
2841
2842 /* Process publications. */
2844 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2845 {
2846 char *pubname;
2847 bool isnull;
2848
2849 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2850 Assert(!isnull);
2851
2852 ExecClearTuple(slot);
2854 }
2855
2856 /*
2857 * Log a warning if the publisher has subscribed to the same table from
2858 * some other publisher. We cannot know the origin of data during the
2859 * initial sync. Data origins can be found only from the WAL by looking at
2860 * the origin id.
2861 *
2862 * XXX: For simplicity, we don't check whether the table has any data or
2863 * not. If the table doesn't have any data then we don't need to
2864 * distinguish between data having origin and data not having origin so we
2865 * can avoid logging a warning for table sync scenario.
2866 */
2867 if (publist)
2868 {
2870
2871 /* Prepare the list of publication(s) for warning message. */
2874
2875 if (check_table_sync)
2878 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2879 subname),
2880 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2881 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2883 errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2884 else
2887 errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2888 subname),
2889 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2890 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2892 errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
2893 }
2894
2896
2898}

References appendStringInfo(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errdetail_plural(), errhint(), errmsg, ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), fb(), get_namespace_name(), get_rel_name(), get_rel_namespace(), GetPublicationsStr(), i, initStringInfo(), list_append_unique(), list_length(), MakeSingleTupleTableSlot(), makeString(), NIL, pfree(), pg_strcasecmp(), quote_literal_cstr(), slot_getattr(), WalRcvExecResult::status, subname, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, WARNING, and wrconn.

Referenced by AlterSubscription(), and AlterSubscription_refresh().

◆ CheckAlterSubOption()

static void CheckAlterSubOption ( Subscription sub,
const char option,
bool  slot_needs_update,
bool  isTopLevel 
)
static

Definition at line 1354 of file subscriptioncmds.c.

1356{
1357 Assert(strcmp(option, "failover") == 0 ||
1358 strcmp(option, "two_phase") == 0 ||
1359 strcmp(option, "retain_dead_tuples") == 0);
1360
1361 /*
1362 * Altering the retain_dead_tuples option does not update the slot on the
1363 * publisher.
1364 */
1365 Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1366
1367 /*
1368 * Do not allow changing the option if the subscription is enabled. This
1369 * is because both failover and two_phase options of the slot on the
1370 * publisher cannot be modified if the slot is currently acquired by the
1371 * existing walsender.
1372 *
1373 * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1374 * the publisher by the existing walsender, so we could have allowed that
1375 * even when the subscription is enabled. But we kept this restriction for
1376 * the sake of consistency and simplicity.
1377 *
1378 * Additionally, do not allow changing the retain_dead_tuples option when
1379 * the subscription is enabled to prevent race conditions arising from the
1380 * new option value being acknowledged asynchronously by the launcher and
1381 * apply workers.
1382 *
1383 * Without the restriction, a race condition may arise when a user
1384 * disables and immediately re-enables the retain_dead_tuples option. In
1385 * this case, the launcher might drop the slot upon noticing the disabled
1386 * action, while the apply worker may keep maintaining
1387 * oldest_nonremovable_xid without noticing the option change. During this
1388 * period, a transaction ID wraparound could falsely make this ID appear
1389 * as if it originates from the future w.r.t the transaction ID stored in
1390 * the slot maintained by launcher.
1391 *
1392 * Similarly, if the user enables retain_dead_tuples concurrently with the
1393 * launcher starting the worker, the apply worker may start calculating
1394 * oldest_nonremovable_xid before the launcher notices the enable action.
1395 * Consequently, the launcher may update slot.xmin to a newer value than
1396 * that maintained by the worker. In subsequent cycles, upon integrating
1397 * the worker's oldest_nonremovable_xid, the launcher might detect a
1398 * retreat in the calculated xmin, necessitating additional handling.
1399 *
1400 * XXX To address the above race conditions, we can define
1401 * oldest_nonremovable_xid as FullTransactionId and adds the check to
1402 * disallow retreating the conflict slot's xmin. For now, we kept the
1403 * implementation simple by disallowing change to the retain_dead_tuples,
1404 * but in the future we can change this after some more analysis.
1405 *
1406 * Note that we could restrict only the enabling of retain_dead_tuples to
1407 * avoid the race conditions described above, but we maintain the
1408 * restriction for both enable and disable operations for the sake of
1409 * consistency.
1410 */
1411 if (sub->enabled)
1412 ereport(ERROR,
1414 errmsg("cannot set option \"%s\" for enabled subscription",
1415 option)));
1416
1418 {
1419 StringInfoData cmd;
1420
1421 /*
1422 * A valid slot must be associated with the subscription for us to
1423 * modify any of the slot's properties.
1424 */
1425 if (!sub->slotname)
1426 ereport(ERROR,
1428 errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1429 option)));
1430
1431 /* The changed option of the slot can't be rolled back. */
1432 initStringInfo(&cmd);
1433 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1434
1436 pfree(cmd.data);
1437 }
1438}

References appendStringInfo(), Assert, StringInfoData::data, Subscription::enabled, ereport, errcode(), errmsg, ERROR, fb(), initStringInfo(), pfree(), PreventInTransactionBlock(), and Subscription::slotname.

Referenced by AlterSubscription().

◆ CheckSubDeadTupleRetention()

void CheckSubDeadTupleRetention ( bool  check_guc,
bool  sub_disabled,
int  elevel_for_sub_disabled,
bool  retain_dead_tuples,
bool  retention_active,
bool  max_retention_set 
)

Definition at line 3083 of file subscriptioncmds.c.

3087{
3090
3092 {
3094 ereport(ERROR,
3096 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3097 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3098
3102 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3103 errhint("Consider setting \"%s\" to true.",
3104 "track_commit_timestamp"));
3105
3109 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3111 ? errhint("Consider setting %s to false.",
3112 "retain_dead_tuples") : 0);
3113 }
3114 else if (max_retention_set)
3115 {
3118 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3119 }
3120}
bool track_commit_timestamp
Definition commit_ts.c:121
int wal_level
Definition xlog.c:138
@ WAL_LEVEL_REPLICA
Definition xlog.h:77

References Assert, ereport, errcode(), errhint(), errmsg, ERROR, fb(), NOTICE, track_commit_timestamp, wal_level, WAL_LEVEL_REPLICA, and WARNING.

Referenced by AlterSubscription(), and DisableSubscriptionAndExit().

◆ CreateSubscription()

ObjectAddress CreateSubscription ( ParseState pstate,
CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 646 of file subscriptioncmds.c.

648{
649 Relation rel;
651 Oid subid;
652 bool nulls[Natts_pg_subscription];
654 Oid owner = GetUserId();
656 Oid serverid;
657 char *conninfo;
659 List *publications;
661 SubOpts opts = {0};
663
664 /*
665 * Parse and check options.
666 *
667 * Connection and publication should not be specified here.
668 */
679
680 /*
681 * Since creating a replication slot is not transactional, rolling back
682 * the transaction leaves the created replication slot. So we cannot run
683 * CREATE SUBSCRIPTION inside a transaction block if creating a
684 * replication slot.
685 */
686 if (opts.create_slot)
687 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
688
689 /*
690 * We don't want to allow unprivileged users to be able to trigger
691 * attempts to access arbitrary network destinations, so require the user
692 * to have been specifically authorized to create subscriptions.
693 */
697 errmsg("permission denied to create subscription"),
698 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
699 "pg_create_subscription")));
700
701 /*
702 * Since a subscription is a database object, we also check for CREATE
703 * permission on the database.
704 */
706 owner, ACL_CREATE);
707 if (aclresult != ACLCHECK_OK)
710
711 /*
712 * Non-superusers are required to set a password for authentication, and
713 * that password must be used by the target server, but the superuser can
714 * exempt a subscription from this requirement.
715 */
716 if (!opts.passwordrequired && !superuser_arg(owner))
719 errmsg("password_required=false is superuser-only"),
720 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
721
722 /*
723 * If built with appropriate switch, whine when regression-testing
724 * conventions for subscription names are violated.
725 */
726#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
727 if (strncmp(stmt->subname, "regress_", 8) != 0)
728 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
729#endif
730
732
733 /* Check if name is used */
736 if (OidIsValid(subid))
737 {
740 errmsg("subscription \"%s\" already exists",
741 stmt->subname)));
742 }
743
744 /*
745 * Ensure that system configuration parameters are set appropriately to
746 * support retain_dead_tuples and max_retention_duration.
747 */
749 opts.retaindeadtuples, opts.retaindeadtuples,
750 (opts.maxretention > 0));
751
752 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
753 opts.slot_name == NULL)
754 opts.slot_name = stmt->subname;
755
756 /* The default for synchronous_commit of subscriptions is off. */
757 if (opts.synchronous_commit == NULL)
758 opts.synchronous_commit = "off";
759
760 /*
761 * The default for wal_receiver_timeout of subscriptions is -1, which
762 * means the value is inherited from the server configuration, command
763 * line, or role/database settings.
764 */
765 if (opts.wal_receiver_timeout == NULL)
766 opts.wal_receiver_timeout = "-1";
767
768 /* Load the library providing us libpq calls. */
769 load_file("libpqwalreceiver", false);
770
771 if (stmt->servername)
772 {
773 ForeignServer *server;
774
775 Assert(!stmt->conninfo);
776 conninfo = NULL;
777
778 server = GetForeignServerByName(stmt->servername, false);
780 if (aclresult != ACLCHECK_OK)
782
783 /* make sure a user mapping exists */
784 GetUserMapping(owner, server->serverid);
785
786 serverid = server->serverid;
787 conninfo = ForeignServerConnectionString(owner, server);
788 }
789 else
790 {
791 Assert(stmt->conninfo);
792
793 serverid = InvalidOid;
794 conninfo = stmt->conninfo;
795 }
796
797 /* Check the connection info string. */
798 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
799
800 publications = stmt->publication;
801
802 /* Everything ok, form a new tuple. */
803 memset(values, 0, sizeof(values));
804 memset(nulls, false, sizeof(nulls));
805
818 CharGetDatum(opts.twophase ?
826 BoolGetDatum(opts.retaindeadtuples);
828 Int32GetDatum(opts.maxretention);
830 BoolGetDatum(opts.retaindeadtuples);
832 if (!OidIsValid(serverid))
834 CStringGetTextDatum(conninfo);
835 else
836 nulls[Anum_pg_subscription_subconninfo - 1] = true;
837 if (opts.slot_name)
840 else
841 nulls[Anum_pg_subscription_subslotname - 1] = true;
843 CStringGetTextDatum(opts.synchronous_commit);
845 CStringGetTextDatum(opts.wal_receiver_timeout);
847 publicationListToArray(publications);
850
852
853 /* Insert tuple into catalog. */
856
858
860
861 if (stmt->servername)
862 {
864
865 Assert(OidIsValid(serverid));
866
869 }
870
871 /*
872 * A replication origin is currently created for all subscriptions,
873 * including those that only contain sequences or are otherwise empty.
874 *
875 * XXX: While this is technically unnecessary, optimizing it would require
876 * additional logic to skip origin creation during DDL operations and
877 * apply workers initialization, and to handle origin creation dynamically
878 * when tables are added to the subscription. It is not clear whether
879 * preventing creation of origins is worth additional complexity.
880 */
883
884 /*
885 * Connect to remote side to execute requested commands and fetch table
886 * and sequence info.
887 */
888 if (opts.connect)
889 {
890 char *err;
893
894 /* Try to connect to the publisher. */
895 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
896 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
897 stmt->subname, &err);
898 if (!wrconn)
901 errmsg("subscription \"%s\" could not connect to the publisher: %s",
902 stmt->subname, err)));
903
904 PG_TRY();
905 {
906 bool has_tables = false;
907 List *pubrels;
908 char relation_state;
909
910 check_publications(wrconn, publications);
912 opts.copy_data,
913 opts.retaindeadtuples, opts.origin,
914 NULL, 0, stmt->subname);
916 opts.copy_data, opts.origin,
917 NULL, 0, stmt->subname);
918
919 if (opts.retaindeadtuples)
921
922 /*
923 * Set sync state based on if we were asked to do data copy or
924 * not.
925 */
927
928 /*
929 * Build local relation status info. Relations are for both tables
930 * and sequences from the publisher.
931 */
932 pubrels = fetch_relation_list(wrconn, publications);
933
935 {
936 Oid relid;
937 char relkind;
938 RangeVar *rv = pubrelinfo->rv;
939
940 relid = RangeVarGetRelid(rv, AccessShareLock, false);
941 relkind = get_rel_relkind(relid);
942
943 /* Check for supported relkind. */
944 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
945 rv->schemaname, rv->relname);
946 has_tables |= (relkind != RELKIND_SEQUENCE);
948 InvalidXLogRecPtr, true);
949 }
950
951 /*
952 * If requested, create permanent slot for the subscription. We
953 * won't use the initial snapshot for anything, so no need to
954 * export it.
955 *
956 * XXX: Similar to origins, it is not clear whether preventing the
957 * slot creation for empty and sequence-only subscriptions is
958 * worth additional complexity.
959 */
960 if (opts.create_slot)
961 {
962 bool twophase_enabled = false;
963
964 Assert(opts.slot_name);
965
966 /*
967 * Even if two_phase is set, don't create the slot with
968 * two-phase enabled. Will enable it once all the tables are
969 * synced and ready. This avoids race-conditions like prepared
970 * transactions being skipped due to changes not being applied
971 * due to checks in should_apply_changes_for_rel() when
972 * tablesync for the corresponding tables are in progress. See
973 * comments atop worker.c.
974 *
975 * Note that if tables were specified but copy_data is false
976 * then it is safe to enable two_phase up-front because those
977 * tables are already initially in READY state. When the
978 * subscription has no tables, we leave the twophase state as
979 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
980 * PUBLICATION to work.
981 */
982 if (opts.twophase && !opts.copy_data && has_tables)
983 twophase_enabled = true;
984
986 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
987
990
992 (errmsg("created replication slot \"%s\" on publisher",
993 opts.slot_name)));
994 }
995 }
996 PG_FINALLY();
997 {
999 }
1000 PG_END_TRY();
1001 }
1002 else
1004 (errmsg("subscription was created, but is not connected"),
1005 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
1006
1008
1010
1011 /*
1012 * Notify the launcher to start the apply worker if the subscription is
1013 * enabled, or to create the conflict detection slot if retain_dead_tuples
1014 * is enabled.
1015 *
1016 * Creating the conflict detection slot is essential even when the
1017 * subscription is not enabled. This ensures that dead tuples are
1018 * retained, which is necessary for accurately identifying the type of
1019 * conflict during replication.
1020 */
1021 if (opts.enabled || opts.retaindeadtuples)
1023
1025
1026 return myself;
1027}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
int errdetail(const char *fmt,...) pg_attribute_printf(1
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define InvokeObjectPostCreateHook(classId, objectId, subId)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
@ OBJECT_FOREIGN_SERVER
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void pgstat_create_subscription(Oid subid)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_CONNECT
bool superuser_arg(Oid roleid)
Definition superuser.c:57
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1681
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

Referenced by ProcessUtilitySlow().

◆ defGetStreamingMode()

char defGetStreamingMode ( DefElem def)

Definition at line 3414 of file subscriptioncmds.c.

3415{
3416 /*
3417 * If no parameter value given, assume "true" is meant.
3418 */
3419 if (!def->arg)
3420 return LOGICALREP_STREAM_ON;
3421
3422 /*
3423 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3424 */
3425 switch (nodeTag(def->arg))
3426 {
3427 case T_Integer:
3428 switch (intVal(def->arg))
3429 {
3430 case 0:
3431 return LOGICALREP_STREAM_OFF;
3432 case 1:
3433 return LOGICALREP_STREAM_ON;
3434 default:
3435 /* otherwise, error out below */
3436 break;
3437 }
3438 break;
3439 default:
3440 {
3441 char *sval = defGetString(def);
3442
3443 /*
3444 * The set of strings accepted here should match up with the
3445 * grammar's opt_boolean_or_string production.
3446 */
3447 if (pg_strcasecmp(sval, "false") == 0 ||
3448 pg_strcasecmp(sval, "off") == 0)
3449 return LOGICALREP_STREAM_OFF;
3450 if (pg_strcasecmp(sval, "true") == 0 ||
3451 pg_strcasecmp(sval, "on") == 0)
3452 return LOGICALREP_STREAM_ON;
3453 if (pg_strcasecmp(sval, "parallel") == 0)
3455 }
3456 break;
3457 }
3458
3459 ereport(ERROR,
3461 errmsg("%s requires a Boolean value or \"parallel\"",
3462 def->defname)));
3463 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3464}
char * defGetString(DefElem *def)
Definition define.c:34
#define nodeTag(nodeptr)
Definition nodes.h:137
char * defname
Definition parsenodes.h:862
Node * arg
Definition parsenodes.h:863
#define intVal(v)
Definition value.h:79

References DefElem::arg, defGetString(), DefElem::defname, ereport, errcode(), errmsg, ERROR, fb(), intVal, nodeTag, and pg_strcasecmp().

Referenced by parse_output_parameters(), and parse_subscription_options().

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 2218 of file subscriptioncmds.c.

2219{
2220 Relation rel;
2222 HeapTuple tup;
2223 Oid subid;
2224 Oid subowner;
2225 Datum datum;
2226 bool isnull;
2227 char *subname;
2228 char *conninfo;
2229 char *slotname;
2231 ListCell *lc;
2232 char originname[NAMEDATALEN];
2233 char *err = NULL;
2236 List *rstates;
2237 bool must_use_password;
2238
2239 /*
2240 * The launcher may concurrently start a new worker for this subscription.
2241 * During initialization, the worker checks for subscription validity and
2242 * exits if the subscription has already been dropped. See
2243 * InitializeLogRepWorker.
2244 */
2246
2248 CStringGetDatum(stmt->subname));
2249
2250 if (!HeapTupleIsValid(tup))
2251 {
2252 table_close(rel, NoLock);
2253
2254 if (!stmt->missing_ok)
2255 ereport(ERROR,
2257 errmsg("subscription \"%s\" does not exist",
2258 stmt->subname)));
2259 else
2261 (errmsg("subscription \"%s\" does not exist, skipping",
2262 stmt->subname)));
2263
2264 return;
2265 }
2266
2268 subid = form->oid;
2269 subowner = form->subowner;
2270 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2271
2272 /* must be owner */
2275 stmt->subname);
2276
2277 /* DROP hook for the subscription being removed */
2279
2280 /*
2281 * Lock the subscription so nobody else can do anything with it (including
2282 * the replication workers).
2283 */
2285
2286 /* Get subname */
2289 subname = pstrdup(NameStr(*DatumGetName(datum)));
2290
2291 /* Get conninfo */
2292 if (OidIsValid(form->subserver))
2293 {
2295 ForeignServer *server;
2296
2297 server = GetForeignServer(form->subserver);
2299 form->subowner, ACL_USAGE);
2300 if (aclresult != ACLCHECK_OK)
2301 {
2302 /*
2303 * Unable to generate connection string because permissions on the
2304 * foreign server have been removed. Follow the same logic as an
2305 * unusable subconninfo (which will result in an ERROR later
2306 * unless slot_name = NONE).
2307 */
2308 err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2309 GetUserNameFromId(form->subowner, false),
2310 server->servername);
2311 conninfo = NULL;
2312 }
2313 else
2314 conninfo = ForeignServerConnectionString(form->subowner,
2315 server);
2316 }
2317 else
2318 {
2321 conninfo = TextDatumGetCString(datum);
2322 }
2323
2324 /* Get slotname */
2327 if (!isnull)
2328 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2329 else
2330 slotname = NULL;
2331
2332 /*
2333 * Since dropping a replication slot is not transactional, the replication
2334 * slot stays dropped even if the transaction rolls back. So we cannot
2335 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2336 * replication slot. Also, in this case, we report a message for dropping
2337 * the subscription to the cumulative stats system.
2338 *
2339 * XXX The command name should really be something like "DROP SUBSCRIPTION
2340 * of a subscription that is associated with a replication slot", but we
2341 * don't have the proper facilities for that.
2342 */
2343 if (slotname)
2344 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2345
2348
2349 /* Remove the tuple from catalog. */
2350 CatalogTupleDelete(rel, &tup->t_self);
2351
2353
2354 /*
2355 * Stop all the subscription workers immediately.
2356 *
2357 * This is necessary if we are dropping the replication slot, so that the
2358 * slot becomes accessible.
2359 *
2360 * It is also necessary if the subscription is disabled and was disabled
2361 * in the same transaction. Then the workers haven't seen the disabling
2362 * yet and will still be running, leading to hangs later when we want to
2363 * drop the replication origin. If the subscription was disabled before
2364 * this transaction, then there shouldn't be any workers left, so this
2365 * won't make a difference.
2366 *
2367 * New workers won't be started because we hold an exclusive lock on the
2368 * subscription till the end of the transaction.
2369 */
2370 subworkers = logicalrep_workers_find(subid, false, true);
2371 foreach(lc, subworkers)
2372 {
2374
2376 }
2378
2379 /*
2380 * Remove the no-longer-useful entry in the launcher's table of apply
2381 * worker start times.
2382 *
2383 * If this transaction rolls back, the launcher might restart a failed
2384 * apply worker before wal_retrieve_retry_interval milliseconds have
2385 * elapsed, but that's pretty harmless.
2386 */
2388
2389 /*
2390 * Cleanup of tablesync replication origins.
2391 *
2392 * Any READY-state relations would already have dealt with clean-ups.
2393 *
2394 * Note that the state can't change because we have already stopped both
2395 * the apply and tablesync workers and they can't restart because of
2396 * exclusive lock on the subscription.
2397 */
2398 rstates = GetSubscriptionRelations(subid, true, false, true);
2399 foreach(lc, rstates)
2400 {
2402 Oid relid = rstate->relid;
2403
2404 /* Only cleanup resources of tablesync workers */
2405 if (!OidIsValid(relid))
2406 continue;
2407
2408 /*
2409 * Drop the tablesync's origin tracking if exists.
2410 *
2411 * It is possible that the origin is not yet created for tablesync
2412 * worker so passing missing_ok = true. This can happen for the states
2413 * before SUBREL_STATE_DATASYNC.
2414 */
2416 sizeof(originname));
2417 replorigin_drop_by_name(originname, true, false);
2418 }
2419
2420 /* Clean up dependencies */
2423
2424 /* Remove any associated relation synchronization states. */
2426
2427 /* Remove the origin tracking if exists. */
2429 replorigin_drop_by_name(originname, true, false);
2430
2431 /*
2432 * Tell the cumulative stats system that the subscription is getting
2433 * dropped.
2434 */
2436
2437 /*
2438 * If there is no slot associated with the subscription, we can finish
2439 * here.
2440 */
2441 if (!slotname && rstates == NIL)
2442 {
2443 table_close(rel, NoLock);
2444 return;
2445 }
2446
2447 /*
2448 * Try to acquire the connection necessary for dropping slots.
2449 *
2450 * Note: If the slotname is NONE/NULL then we allow the command to finish
2451 * and users need to manually cleanup the apply and tablesync worker slots
2452 * later.
2453 *
2454 * This has to be at the end because otherwise if there is an error while
2455 * doing the database operations we won't be able to rollback dropped
2456 * slot.
2457 */
2458 load_file("libpqwalreceiver", false);
2459
2460 if (conninfo)
2461 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2462 subname, &err);
2463
2464 if (wrconn == NULL)
2465 {
2466 if (!slotname)
2467 {
2468 /* be tidy */
2470 table_close(rel, NoLock);
2471 return;
2472 }
2473 else
2474 {
2475 ReportSlotConnectionError(rstates, subid, slotname, err);
2476 }
2477 }
2478
2479 PG_TRY();
2480 {
2481 foreach(lc, rstates)
2482 {
2484 Oid relid = rstate->relid;
2485
2486 /* Only cleanup resources of tablesync workers */
2487 if (!OidIsValid(relid))
2488 continue;
2489
2490 /*
2491 * Drop the tablesync slots associated with removed tables.
2492 *
2493 * For SYNCDONE/READY states, the tablesync slot is known to have
2494 * already been dropped by the tablesync worker.
2495 *
2496 * For other states, there is no certainty, maybe the slot does
2497 * not exist yet. Also, if we fail after removing some of the
2498 * slots, next time, it will again try to drop already dropped
2499 * slots and fail. For these reasons, we allow missing_ok = true
2500 * for the drop.
2501 */
2502 if (rstate->state != SUBREL_STATE_SYNCDONE)
2503 {
2504 char syncslotname[NAMEDATALEN] = {0};
2505
2507 sizeof(syncslotname));
2509 }
2510 }
2511
2513
2514 /*
2515 * If there is a slot associated with the subscription, then drop the
2516 * replication slot at the publisher.
2517 */
2518 if (slotname)
2519 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2520 }
2521 PG_FINALLY();
2522 {
2524 }
2525 PG_END_TRY();
2526
2527 table_close(rel, NoLock);
2528}
#define _(x)
Definition elog.c:96
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
void list_free(List *list)
Definition list.c:1546
char * pstrdup(const char *in)
Definition mcxt.c:1910
#define InvokeObjectDropHook(classId, objectId, subId)
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
Definition pg_depend.c:314
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void pgstat_drop_subscription(Oid subid)
static Name DatumGetName(Datum X)
Definition postgres.h:393
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
LogicalRepWorkerType type
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:231
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:626
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596

References _, AccessExclusiveLock, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ApplyLauncherForgetWorkerStartTime(), CatalogTupleDelete(), CStringGetDatum(), DatumGetName(), deleteDependencyRecordsFor(), deleteSharedDependencyRecordsFor(), ereport, err(), errcode(), errmsg, ERROR, EventTriggerSQLDropAddObject(), fb(), ForeignServerConnectionString(), Form_pg_subscription, GetForeignServer(), GETSTRUCT(), GetSubscriptionRelations(), GetUserId(), GetUserNameFromId(), HeapTupleIsValid, InvalidOid, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), MyDatabaseId, NAMEDATALEN, NameStr, NIL, NoLock, NOTICE, object_aclcheck(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_drop_subscription(), PreventInTransactionBlock(), psprintf(), pstrdup(), ReleaseSysCache(), SubscriptionRelState::relid, LogicalRepWorker::relid, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), ReportSlotConnectionError(), RowExclusiveLock, SearchSysCache2(), ForeignServer::servername, SubscriptionRelState::state, stmt, LogicalRepWorker::subid, subname, superuser_arg(), SysCacheGetAttr(), SysCacheGetAttrNotNull(), table_close(), table_open(), TextDatumGetCString, LogicalRepWorker::type, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().

◆ fetch_relation_list()

static List * fetch_relation_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 3147 of file subscriptioncmds.c.

3148{
3149 WalRcvExecResult *res;
3150 StringInfoData cmd;
3151 TupleTableSlot *slot;
3155 bool check_columnlist = (server_version >= 150000);
3156 int column_count = check_columnlist ? 4 : 3;
3157 StringInfoData pub_names;
3158
3159 initStringInfo(&cmd);
3160 initStringInfo(&pub_names);
3161
3162 /* Build the pub_names comma-separated string. */
3163 GetPublicationsStr(publications, &pub_names, true);
3164
3165 /* Get the list of relations from the publisher */
3166 if (server_version >= 160000)
3167 {
3169
3170 /*
3171 * From version 16, we allowed passing multiple publications to the
3172 * function pg_get_publication_tables. This helped to filter out the
3173 * partition table whose ancestor is also published in this
3174 * publication array.
3175 *
3176 * Join pg_get_publication_tables with pg_publication to exclude
3177 * non-existing publications.
3178 *
3179 * Note that attrs are always stored in sorted order so we don't need
3180 * to worry if different publications have specified them in a
3181 * different order. See pub_collist_validate.
3182 */
3183 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
3184 " FROM pg_class c\n"
3185 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
3186 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
3187 " FROM pg_publication\n"
3188 " WHERE pubname IN ( %s )) AS gpt\n"
3189 " ON gpt.relid = c.oid\n",
3190 pub_names.data);
3191
3192 /* From version 19, inclusion of sequences in the target is supported */
3193 if (server_version >= 190000)
3194 appendStringInfo(&cmd,
3195 "UNION ALL\n"
3196 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
3197 " FROM pg_catalog.pg_publication_sequences s\n"
3198 " WHERE s.pubname IN ( %s )",
3199 pub_names.data);
3200 }
3201 else
3202 {
3204 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
3205
3206 /* Get column lists for each relation if the publisher supports it */
3207 if (check_columnlist)
3208 appendStringInfoString(&cmd, ", t.attnames\n");
3209
3210 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
3211 " WHERE t.pubname IN ( %s )",
3212 pub_names.data);
3213 }
3214
3215 pfree(pub_names.data);
3216
3218 pfree(cmd.data);
3219
3220 if (res->status != WALRCV_OK_TUPLES)
3221 ereport(ERROR,
3223 errmsg("could not receive list of replicated tables from the publisher: %s",
3224 res->err)));
3225
3226 /* Process tables. */
3228 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3229 {
3230 char *nspname;
3231 char *relname;
3232 bool isnull;
3233 char relkind;
3235
3236 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3237 Assert(!isnull);
3238 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3239 Assert(!isnull);
3240 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
3241 Assert(!isnull);
3242
3243 relinfo->rv = makeRangeVar(nspname, relname, -1);
3244 relinfo->relkind = relkind;
3245
3246 if (relkind != RELKIND_SEQUENCE &&
3249 ereport(ERROR,
3251 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
3252 nspname, relname));
3253 else
3255
3256 ExecClearTuple(slot);
3257 }
3259
3261
3262 return relationlist;
3263}
#define CppAsString2(x)
Definition c.h:506
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition makefuncs.c:473
static int server_version
Definition pg_dumpall.c:122
static char DatumGetChar(Datum X)
Definition postgres.h:122
static bool list_member_rangevar(const List *list, RangeVar *rv)

References appendStringInfo(), appendStringInfoString(), Assert, CppAsString2, StringInfoData::data, DatumGetChar(), ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), fb(), GetPublicationsStr(), initStringInfo(), InvalidOid, lappend(), list_member_rangevar(), makeRangeVar(), MakeSingleTupleTableSlot(), NIL, palloc_object, pfree(), relname, server_version, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, and wrconn.

Referenced by AlterSubscription_refresh().

◆ list_member_rangevar()

static bool list_member_rangevar ( const List list,
RangeVar rv 
)
static

Definition at line 3126 of file subscriptioncmds.c.

3127{
3129 {
3130 if (equal(relinfo->rv, rv))
3131 return true;
3132 }
3133
3134 return false;
3135}
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223

References equal(), fb(), and foreach_ptr.

Referenced by fetch_relation_list().

◆ merge_publications()

static List * merge_publications ( List oldpublist,
List newpublist,
bool  addpub,
const char subname 
)
static

Definition at line 3355 of file subscriptioncmds.c.

3356{
3357 ListCell *lc;
3358
3360
3362
3363 foreach(lc, newpublist)
3364 {
3365 char *name = strVal(lfirst(lc));
3366 ListCell *lc2;
3367 bool found = false;
3368
3369 foreach(lc2, oldpublist)
3370 {
3371 char *pubname = strVal(lfirst(lc2));
3372
3373 if (strcmp(name, pubname) == 0)
3374 {
3375 found = true;
3376 if (addpub)
3377 ereport(ERROR,
3379 errmsg("publication \"%s\" is already in subscription \"%s\"",
3380 name, subname)));
3381 else
3383
3384 break;
3385 }
3386 }
3387
3388 if (addpub && !found)
3390 else if (!addpub && !found)
3391 ereport(ERROR,
3393 errmsg("publication \"%s\" is not in subscription \"%s\"",
3394 name, subname)));
3395 }
3396
3397 /*
3398 * XXX Probably no strong reason for this, but for now it's to make ALTER
3399 * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3400 */
3401 if (!oldpublist)
3402 ereport(ERROR,
3404 errmsg("cannot drop all the publications from a subscription")));
3405
3406 return oldpublist;
3407}
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:423
static void check_duplicates_in_publist(List *publist, Datum *datums)

References check_duplicates_in_publist(), ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg, ERROR, fb(), foreach_delete_current, lappend(), lfirst, list_copy(), makeString(), name, strVal, and subname.

Referenced by AlterSubscription().

◆ parse_subscription_options()

static void parse_subscription_options ( ParseState pstate,
List stmt_options,
uint32  supported_opts,
SubOpts opts 
)
static

Definition at line 152 of file subscriptioncmds.c.

154{
155 ListCell *lc;
156
157 /* Start out with cleared opts. */
158 memset(opts, 0, sizeof(SubOpts));
159
160 /* caller must expect some option */
162
163 /* If connect option is supported, these others also need to be. */
167
168 /* Set default values for the supported options. */
170 opts->connect = true;
172 opts->enabled = true;
174 opts->create_slot = true;
176 opts->copy_data = true;
178 opts->refresh = true;
180 opts->binary = false;
182 opts->streaming = LOGICALREP_STREAM_PARALLEL;
184 opts->twophase = false;
186 opts->disableonerr = false;
188 opts->passwordrequired = true;
190 opts->runasowner = false;
192 opts->failover = false;
194 opts->retaindeadtuples = false;
196 opts->maxretention = 0;
199
200 /* Parse options */
201 foreach(lc, stmt_options)
202 {
203 DefElem *defel = (DefElem *) lfirst(lc);
204
206 strcmp(defel->defname, "connect") == 0)
207 {
208 if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
210
211 opts->specified_opts |= SUBOPT_CONNECT;
212 opts->connect = defGetBoolean(defel);
213 }
215 strcmp(defel->defname, "enabled") == 0)
216 {
217 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
219
220 opts->specified_opts |= SUBOPT_ENABLED;
221 opts->enabled = defGetBoolean(defel);
222 }
224 strcmp(defel->defname, "create_slot") == 0)
225 {
226 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
228
229 opts->specified_opts |= SUBOPT_CREATE_SLOT;
230 opts->create_slot = defGetBoolean(defel);
231 }
233 strcmp(defel->defname, "slot_name") == 0)
234 {
235 if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
237
238 opts->specified_opts |= SUBOPT_SLOT_NAME;
239 opts->slot_name = defGetString(defel);
240
241 /* Setting slot_name = NONE is treated as no slot name. */
242 if (strcmp(opts->slot_name, "none") == 0)
243 opts->slot_name = NULL;
244 else
245 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
246 }
248 strcmp(defel->defname, "copy_data") == 0)
249 {
250 if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
252
253 opts->specified_opts |= SUBOPT_COPY_DATA;
254 opts->copy_data = defGetBoolean(defel);
255 }
257 strcmp(defel->defname, "synchronous_commit") == 0)
258 {
259 if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
261
262 opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
263 opts->synchronous_commit = defGetString(defel);
264
265 /* Test if the given value is valid for synchronous_commit GUC. */
266 (void) set_config_option("synchronous_commit", opts->synchronous_commit,
268 false, 0, false);
269 }
271 strcmp(defel->defname, "refresh") == 0)
272 {
273 if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
275
276 opts->specified_opts |= SUBOPT_REFRESH;
277 opts->refresh = defGetBoolean(defel);
278 }
280 strcmp(defel->defname, "binary") == 0)
281 {
282 if (IsSet(opts->specified_opts, SUBOPT_BINARY))
284
285 opts->specified_opts |= SUBOPT_BINARY;
286 opts->binary = defGetBoolean(defel);
287 }
289 strcmp(defel->defname, "streaming") == 0)
290 {
291 if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
293
294 opts->specified_opts |= SUBOPT_STREAMING;
295 opts->streaming = defGetStreamingMode(defel);
296 }
298 strcmp(defel->defname, "two_phase") == 0)
299 {
300 if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
302
303 opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
304 opts->twophase = defGetBoolean(defel);
305 }
307 strcmp(defel->defname, "disable_on_error") == 0)
308 {
309 if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
311
312 opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
313 opts->disableonerr = defGetBoolean(defel);
314 }
316 strcmp(defel->defname, "password_required") == 0)
317 {
318 if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
320
321 opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
322 opts->passwordrequired = defGetBoolean(defel);
323 }
325 strcmp(defel->defname, "run_as_owner") == 0)
326 {
327 if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
329
330 opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
331 opts->runasowner = defGetBoolean(defel);
332 }
334 strcmp(defel->defname, "failover") == 0)
335 {
336 if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
338
339 opts->specified_opts |= SUBOPT_FAILOVER;
340 opts->failover = defGetBoolean(defel);
341 }
343 strcmp(defel->defname, "retain_dead_tuples") == 0)
344 {
345 if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
347
348 opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
349 opts->retaindeadtuples = defGetBoolean(defel);
350 }
352 strcmp(defel->defname, "max_retention_duration") == 0)
353 {
354 if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
356
357 opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
358 opts->maxretention = defGetInt32(defel);
359
360 if (opts->maxretention < 0)
363 errmsg("max_retention_duration cannot be negative"));
364 }
366 strcmp(defel->defname, "origin") == 0)
367 {
368 if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
370
371 opts->specified_opts |= SUBOPT_ORIGIN;
372 pfree(opts->origin);
373
374 /*
375 * Even though the "origin" parameter allows only "none" and "any"
376 * values, it is implemented as a string type so that the
377 * parameter can be extended in future versions to support
378 * filtering using origin names specified by the user.
379 */
380 opts->origin = defGetString(defel);
381
382 if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
386 errmsg("unrecognized origin value: \"%s\"", opts->origin));
387 }
388 else if (IsSet(supported_opts, SUBOPT_LSN) &&
389 strcmp(defel->defname, "lsn") == 0)
390 {
391 char *lsn_str = defGetString(defel);
392 XLogRecPtr lsn;
393
394 if (IsSet(opts->specified_opts, SUBOPT_LSN))
396
397 /* Setting lsn = NONE is treated as resetting LSN */
398 if (strcmp(lsn_str, "none") == 0)
399 lsn = InvalidXLogRecPtr;
400 else
401 {
402 /* Parse the argument as LSN */
405
406 if (!XLogRecPtrIsValid(lsn))
409 errmsg("invalid WAL location (LSN): %s", lsn_str)));
410 }
411
412 opts->specified_opts |= SUBOPT_LSN;
413 opts->lsn = lsn;
414 }
416 strcmp(defel->defname, "wal_receiver_timeout") == 0)
417 {
418 bool parsed;
419 int val;
420
421 if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
423
424 opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
425 opts->wal_receiver_timeout = defGetString(defel);
426
427 /*
428 * Test if the given value is valid for wal_receiver_timeout GUC.
429 * Skip this test if the value is -1, since -1 is allowed for the
430 * wal_receiver_timeout subscription option, but not for the GUC
431 * itself.
432 */
433 parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
434 if (!parsed || val != -1)
435 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
437 false, 0, false);
438 }
439 else
442 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
443 }
444
445 /*
446 * We've been explicitly asked to not connect, that requires some
447 * additional processing.
448 */
449 if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
450 {
451 /* Check for incompatible options from the user. */
452 if (opts->enabled &&
453 IsSet(opts->specified_opts, SUBOPT_ENABLED))
456 /*- translator: both %s are strings of the form "option = value" */
457 errmsg("%s and %s are mutually exclusive options",
458 "connect = false", "enabled = true")));
459
460 if (opts->create_slot &&
461 IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
464 errmsg("%s and %s are mutually exclusive options",
465 "connect = false", "create_slot = true")));
466
467 if (opts->copy_data &&
468 IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
471 errmsg("%s and %s are mutually exclusive options",
472 "connect = false", "copy_data = true")));
473
474 /* Change the defaults of other options. */
475 opts->enabled = false;
476 opts->create_slot = false;
477 opts->copy_data = false;
478 }
479
480 /*
481 * Do additional checking for disallowed combination when slot_name = NONE
482 * was used.
483 */
484 if (!opts->slot_name &&
485 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
486 {
487 if (opts->enabled)
488 {
489 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
492 /*- translator: both %s are strings of the form "option = value" */
493 errmsg("%s and %s are mutually exclusive options",
494 "slot_name = NONE", "enabled = true")));
495 else
498 /*- translator: both %s are strings of the form "option = value" */
499 errmsg("subscription with %s must also set %s",
500 "slot_name = NONE", "enabled = false")));
501 }
502
503 if (opts->create_slot)
504 {
505 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
508 /*- translator: both %s are strings of the form "option = value" */
509 errmsg("%s and %s are mutually exclusive options",
510 "slot_name = NONE", "create_slot = true")));
511 else
514 /*- translator: both %s are strings of the form "option = value" */
515 errmsg("subscription with %s must also set %s",
516 "slot_name = NONE", "create_slot = false")));
517 }
518 }
519}
int32 defGetInt32(DefElem *def)
Definition define.c:148
bool defGetBoolean(DefElem *def)
Definition define.c:93
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition define.c:370
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition guc.c:3248
@ GUC_ACTION_SET
Definition guc.h:203
@ PGC_S_TEST
Definition guc.h:125
@ PGC_BACKEND
Definition guc.h:77
long val
Definition informix.c:689
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition pg_lsn.h:25
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:265
char defGetStreamingMode(DefElem *def)

References Assert, CStringGetDatum(), DatumGetLSN(), defGetBoolean(), defGetInt32(), defGetStreamingMode(), defGetString(), DirectFunctionCall1, ereport, errcode(), errmsg, ERROR, errorConflictingDefElem(), fb(), GUC_ACTION_SET, InvalidXLogRecPtr, IsSet, lfirst, opts, parse_int(), pfree(), pg_lsn_in(), pg_strcasecmp(), PGC_BACKEND, PGC_S_TEST, pstrdup(), ReplicationSlotValidateName(), set_config_option(), SUBOPT_BINARY, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, val, and XLogRecPtrIsValid.

Referenced by AlterSubscription().

◆ publicationListToArray()

static Datum publicationListToArray ( List publist)
static

Definition at line 616 of file subscriptioncmds.c.

617{
618 ArrayType *arr;
619 Datum *datums;
620 MemoryContext memcxt;
622
623 /* Create memory context for temporary allocations. */
625 "publicationListToArray to array",
628
630
632
634
636
637 MemoryContextDelete(memcxt);
638
639 return PointerGetDatum(arr);
640}
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
#define palloc_array(type, count)
Definition fe_memutils.h:91
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
#define PointerGetDatum(X)
Definition postgres.h:354

Referenced by AlterSubscription().

◆ ReplicationSlotDropAtPubNode()

void ReplicationSlotDropAtPubNode ( WalReceiverConn wrconn,
char slotname,
bool  missing_ok 
)

Definition at line 2538 of file subscriptioncmds.c.

2539{
2540 StringInfoData cmd;
2541
2542 Assert(wrconn);
2543
2544 load_file("libpqwalreceiver", false);
2545
2546 initStringInfo(&cmd);
2547 appendStringInfoString(&cmd, "DROP_REPLICATION_SLOT ");
2548 appendQuotedIdentifier(&cmd, slotname);
2549 appendStringInfoString(&cmd, " WAIT");
2550
2551 PG_TRY();
2552 {
2553 WalRcvExecResult *res;
2554
2555 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2556
2557 if (res->status == WALRCV_OK_COMMAND)
2558 {
2559 /* NOTICE. Success. */
2561 (errmsg("dropped replication slot \"%s\" on publisher",
2562 slotname)));
2563 }
2564 else if (res->status == WALRCV_ERROR &&
2565 missing_ok &&
2567 {
2568 /* LOG. Error, but missing_ok = true. */
2569 ereport(LOG,
2570 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2571 slotname, res->err)));
2572 }
2573 else
2574 {
2575 /* ERROR. */
2576 ereport(ERROR,
2578 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2579 slotname, res->err)));
2580 }
2581
2583 }
2584 PG_FINALLY();
2585 {
2586 pfree(cmd.data);
2587 }
2588 PG_END_TRY();
2589}
#define LOG
Definition elog.h:32
#define appendQuotedIdentifier(b, s)
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR

References appendQuotedIdentifier, appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, fb(), initStringInfo(), load_file(), LOG, NOTICE, pfree(), PG_END_TRY, PG_FINALLY, PG_TRY, WalRcvExecResult::sqlstate, WalRcvExecResult::status, walrcv_clear_result(), WALRCV_ERROR, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and ProcessSyncingTablesForSync().

◆ ReportSlotConnectionError()

static void ReportSlotConnectionError ( List rstates,
Oid  subid,
char slotname,
char err 
)
static

Definition at line 3271 of file subscriptioncmds.c.

3272{
3273 ListCell *lc;
3274
3275 foreach(lc, rstates)
3276 {
3278 Oid relid = rstate->relid;
3279
3280 /* Only cleanup resources of tablesync workers */
3281 if (!OidIsValid(relid))
3282 continue;
3283
3284 /*
3285 * Caller needs to ensure that relstate doesn't change underneath us.
3286 * See DropSubscription where we get the relstates.
3287 */
3288 if (rstate->state != SUBREL_STATE_SYNCDONE)
3289 {
3290 char syncslotname[NAMEDATALEN] = {0};
3291
3293 sizeof(syncslotname));
3294 elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3295 syncslotname);
3296 }
3297 }
3298
3299 ereport(ERROR,
3301 errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3302 slotname, err),
3303 /* translator: %s is an SQL ALTER command */
3304 errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3305 "ALTER SUBSCRIPTION ... DISABLE",
3306 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3307}

References elog, ereport, err(), errcode(), errhint(), errmsg, ERROR, fb(), lfirst, NAMEDATALEN, OidIsValid, SubscriptionRelState::relid, ReplicationSlotNameForTablesync(), SubscriptionRelState::state, and WARNING.

Referenced by DropSubscription().