PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.c File Reference
#include "postgres.h"
#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/objectaccess.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "executor/executor.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
Include dependency graph for subscriptioncmds.c:

Go to the source code of this file.

Data Structures

struct  SubOpts
 
struct  PublicationRelKind
 

Macros

#define SUBOPT_CONNECT   0x00000001
 
#define SUBOPT_ENABLED   0x00000002
 
#define SUBOPT_CREATE_SLOT   0x00000004
 
#define SUBOPT_SLOT_NAME   0x00000008
 
#define SUBOPT_COPY_DATA   0x00000010
 
#define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020
 
#define SUBOPT_REFRESH   0x00000040
 
#define SUBOPT_BINARY   0x00000080
 
#define SUBOPT_STREAMING   0x00000100
 
#define SUBOPT_TWOPHASE_COMMIT   0x00000200
 
#define SUBOPT_DISABLE_ON_ERR   0x00000400
 
#define SUBOPT_PASSWORD_REQUIRED   0x00000800
 
#define SUBOPT_RUN_AS_OWNER   0x00001000
 
#define SUBOPT_FAILOVER   0x00002000
 
#define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000
 
#define SUBOPT_MAX_RETENTION_DURATION   0x00008000
 
#define SUBOPT_WAL_RECEIVER_TIMEOUT   0x00010000
 
#define SUBOPT_LSN   0x00020000
 
#define SUBOPT_ORIGIN   0x00040000
 
#define SUBOPT_CONFLICT_LOG_DEST   0x00080000
 
#define IsSet(val, bits)   (((val) & (bits)) == (bits))
 
#define appendQuotedIdentifier(b, s)   appendQuotedString(b, s, '"')
 
#define appendQuotedLiteral(b, s)   appendQuotedString(b, s, '\'')
 

Typedefs

typedef struct SubOpts SubOpts
 
typedef struct PublicationRelKind PublicationRelKind
 

Functions

static Listfetch_relation_list (WalReceiverConn *wrconn, List *publications)
 
static void check_publications_origin_tables (WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
 
static void check_publications_origin_sequences (WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
 
static void check_pub_dead_tuple_retention (WalReceiverConn *wrconn)
 
static void check_duplicates_in_publist (List *publist, Datum *datums)
 
static Listmerge_publications (List *oldpublist, List *newpublist, bool addpub, const char *subname)
 
static void ReportSlotConnectionError (List *rstates, Oid subid, char *slotname, char *err)
 
static void CheckAlterSubOption (Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
 
static bool alter_sub_conflict_log_dest (Subscription *sub, ConflictLogDest oldlogdest, ConflictLogDest newlogdest, Oid *conflicttablerelid)
 
static void drop_sub_conflict_log_table (Oid subid, char *subname, Oid subconflictlogrelid)
 
static void parse_subscription_options (ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
 
static void appendQuotedString (StringInfo buf, const char *str, char quote)
 
static void check_publications (WalReceiverConn *wrconn, List *publications)
 
static Datum publicationListToArray (List *publist)
 
ObjectAddress CreateSubscription (ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
 
static void AlterSubscription_refresh (Subscription *sub, bool copy_data, List *validate_publications)
 
static void AlterSubscription_refresh_seq (Subscription *sub)
 
ObjectAddress AlterSubscription (ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
 
static charconstruct_subserver_conninfo (Oid subserver, Oid subowner, char **err)
 
void DropSubscription (DropSubscriptionStmt *stmt, bool isTopLevel)
 
void ReplicationSlotDropAtPubNode (WalReceiverConn *wrconn, char *slotname, bool missing_ok)
 
static void AlterSubscriptionOwner_internal (Relation rel, HeapTuple tup, Oid newOwnerId)
 
ObjectAddress AlterSubscriptionOwner (const char *name, Oid newOwnerId)
 
void AlterSubscriptionOwner_oid (Oid subid, Oid newOwnerId)
 
void CheckSubDeadTupleRetention (bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
 
static bool list_member_rangevar (const List *list, RangeVar *rv)
 
char defGetStreamingMode (DefElem *def)
 

Macro Definition Documentation

◆ appendQuotedIdentifier

#define appendQuotedIdentifier (   b,
 
)    appendQuotedString(b, s, '"')

Definition at line 567 of file subscriptioncmds.c.

◆ appendQuotedLiteral

#define appendQuotedLiteral (   b,
 
)    appendQuotedString(b, s, '\'')

Definition at line 568 of file subscriptioncmds.c.

◆ IsSet

#define IsSet (   val,
  bits 
)    (((val) & (bits)) == (bits))

Definition at line 87 of file subscriptioncmds.c.

◆ SUBOPT_BINARY

#define SUBOPT_BINARY   0x00000080

Definition at line 72 of file subscriptioncmds.c.

◆ SUBOPT_CONFLICT_LOG_DEST

#define SUBOPT_CONFLICT_LOG_DEST   0x00080000

Definition at line 84 of file subscriptioncmds.c.

◆ SUBOPT_CONNECT

#define SUBOPT_CONNECT   0x00000001

Definition at line 65 of file subscriptioncmds.c.

◆ SUBOPT_COPY_DATA

#define SUBOPT_COPY_DATA   0x00000010

Definition at line 69 of file subscriptioncmds.c.

◆ SUBOPT_CREATE_SLOT

#define SUBOPT_CREATE_SLOT   0x00000004

Definition at line 67 of file subscriptioncmds.c.

◆ SUBOPT_DISABLE_ON_ERR

#define SUBOPT_DISABLE_ON_ERR   0x00000400

Definition at line 75 of file subscriptioncmds.c.

◆ SUBOPT_ENABLED

#define SUBOPT_ENABLED   0x00000002

Definition at line 66 of file subscriptioncmds.c.

◆ SUBOPT_FAILOVER

#define SUBOPT_FAILOVER   0x00002000

Definition at line 78 of file subscriptioncmds.c.

◆ SUBOPT_LSN

#define SUBOPT_LSN   0x00020000

Definition at line 82 of file subscriptioncmds.c.

◆ SUBOPT_MAX_RETENTION_DURATION

#define SUBOPT_MAX_RETENTION_DURATION   0x00008000

Definition at line 80 of file subscriptioncmds.c.

◆ SUBOPT_ORIGIN

#define SUBOPT_ORIGIN   0x00040000

Definition at line 83 of file subscriptioncmds.c.

◆ SUBOPT_PASSWORD_REQUIRED

#define SUBOPT_PASSWORD_REQUIRED   0x00000800

Definition at line 76 of file subscriptioncmds.c.

◆ SUBOPT_REFRESH

#define SUBOPT_REFRESH   0x00000040

Definition at line 71 of file subscriptioncmds.c.

◆ SUBOPT_RETAIN_DEAD_TUPLES

#define SUBOPT_RETAIN_DEAD_TUPLES   0x00004000

Definition at line 79 of file subscriptioncmds.c.

◆ SUBOPT_RUN_AS_OWNER

#define SUBOPT_RUN_AS_OWNER   0x00001000

Definition at line 77 of file subscriptioncmds.c.

◆ SUBOPT_SLOT_NAME

#define SUBOPT_SLOT_NAME   0x00000008

Definition at line 68 of file subscriptioncmds.c.

◆ SUBOPT_STREAMING

#define SUBOPT_STREAMING   0x00000100

Definition at line 73 of file subscriptioncmds.c.

◆ SUBOPT_SYNCHRONOUS_COMMIT

#define SUBOPT_SYNCHRONOUS_COMMIT   0x00000020

Definition at line 70 of file subscriptioncmds.c.

◆ SUBOPT_TWOPHASE_COMMIT

#define SUBOPT_TWOPHASE_COMMIT   0x00000200

Definition at line 74 of file subscriptioncmds.c.

◆ SUBOPT_WAL_RECEIVER_TIMEOUT

#define SUBOPT_WAL_RECEIVER_TIMEOUT   0x00010000

Definition at line 81 of file subscriptioncmds.c.

Typedef Documentation

◆ PublicationRelKind

◆ SubOpts

Function Documentation

◆ alter_sub_conflict_log_dest()

static bool alter_sub_conflict_log_dest ( Subscription sub,
ConflictLogDest  oldlogdest,
ConflictLogDest  newlogdest,
Oid conflicttablerelid 
)
static

Definition at line 1516 of file subscriptioncmds.c.

1519{
1520 bool want_table;
1521 bool has_oldtable;
1522 bool update_relid = false;
1523 Oid relid = InvalidOid;
1524
1527
1528 if (has_oldtable)
1529 {
1530 /* There is a conflict log table already. */
1531 if (!want_table)
1532 {
1534 sub->conflictlogrelid);
1535 update_relid = true;
1536 }
1537 }
1538 else
1539 {
1540 /* There was no previous conflict log table. */
1541 if (want_table)
1542 {
1545
1546 relid = create_conflict_log_table(sub->oid, sub->name, sub->owner);
1547 update_relid = true;
1548
1549 /*
1550 * Establish an internal dependency between the conflict log table
1551 * and the subscription. For details refer comments in
1552 * CreateSubscription function.
1553 */
1557 }
1558 }
1559
1560 *conflicttablerelid = relid;
1561 return update_relid;
1562}
Oid create_conflict_log_table(Oid subid, char *subname, Oid subowner)
Definition conflict.c:147
#define CONFLICTS_LOGGED_TO_TABLE(dest)
Definition conflict.h:96
@ DEPENDENCY_INTERNAL
Definition dependency.h:35
#define ObjectAddressSet(addr, class_id, object_id)
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:51
#define InvalidOid
unsigned int Oid
static int fb(int x)
static void drop_sub_conflict_log_table(Oid subid, char *subname, Oid subconflictlogrelid)

References Subscription::conflictlogrelid, CONFLICTS_LOGGED_TO_TABLE, create_conflict_log_table(), DEPENDENCY_INTERNAL, drop_sub_conflict_log_table(), fb(), InvalidOid, Subscription::name, ObjectAddressSet, Subscription::oid, Subscription::owner, and recordDependencyOn().

Referenced by AlterSubscription().

◆ AlterSubscription()

ObjectAddress AlterSubscription ( ParseState pstate,
AlterSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 1568 of file subscriptioncmds.c.

1570{
1571 Relation rel;
1573 bool nulls[Natts_pg_subscription];
1576 HeapTuple tup;
1577 Oid subid;
1578 bool orig_conninfo_needed = true;
1579 bool update_tuple = false;
1580 bool update_failover = false;
1581 bool update_two_phase = false;
1582 bool check_pub_rdt = false;
1583 bool retain_dead_tuples;
1584 int max_retention;
1585 bool retention_active;
1586 char *new_conninfo = NULL;
1587 char *origin;
1588 Subscription *sub;
1591 SubOpts opts = {0};
1592
1594
1595 /* Fetch the existing tuple. */
1597 CStringGetDatum(stmt->subname));
1598
1599 if (!HeapTupleIsValid(tup))
1600 ereport(ERROR,
1602 errmsg("subscription \"%s\" does not exist",
1603 stmt->subname)));
1604
1606 subid = form->oid;
1607
1608 /* must be owner */
1611 stmt->subname);
1612
1613 /* parse and check options */
1614 switch (stmt->kind)
1615 {
1628 break;
1629
1632 break;
1633
1636 break;
1637
1641 break;
1642
1645 break;
1646
1649 break;
1650
1651 default:
1652 supported_opts = 0;
1653 break;
1654 }
1655
1656 if (supported_opts > 0)
1658
1659 /*
1660 * Ensure that ALTER SUBSCRIPTION commands that could be used to fix a
1661 * broken connection or prepare to drop a broken subscription don't
1662 * attempt to construct the conninfo. Otherwise, we might encounter the
1663 * error the user is trying to fix.
1664 *
1665 * Specifically, ALTER SUBSCRIPTION DISABLE, ALTER SUBSCRIPTION SERVER,
1666 * ALTER SUBSCRIPTION CONNECTION, or ALTER SUBSCRIPTION SET
1667 * (slot_name=NONE).
1668 *
1669 * NB: if the user specifies multiple SET options, then we may still need
1670 * to construct conninfo even if slot_name is set to NONE.
1671 */
1672 if (stmt->kind == ALTER_SUBSCRIPTION_ENABLED)
1673 {
1674 if (opts.specified_opts == SUBOPT_ENABLED && !opts.enabled)
1675 orig_conninfo_needed = false;
1676 }
1677 else if (stmt->kind == ALTER_SUBSCRIPTION_SERVER ||
1679 {
1680 orig_conninfo_needed = false;
1681 }
1682 else if (stmt->kind == ALTER_SUBSCRIPTION_OPTIONS)
1683 {
1684 /* ... SET (slot_name = NONE) with no other options */
1685 if (opts.specified_opts == SUBOPT_SLOT_NAME && !opts.slot_name)
1686 orig_conninfo_needed = false;
1687 }
1688
1689 /*
1690 * Skip ACL checks on the subscription's foreign server, if any. If
1691 * changing the server (or replacing it with a raw connection), then the
1692 * old one will be removed anyway. If changing something unrelated,
1693 * there's no need to do an additional ACL check here; that will be done
1694 * by the subscription worker.
1695 */
1696 sub = GetSubscription(subid, false, orig_conninfo_needed, false);
1697
1699 origin = sub->origin;
1702
1703 /*
1704 * Don't allow non-superuser modification of a subscription with
1705 * password_required=false.
1706 */
1707 if (!sub->passwordrequired && !superuser())
1708 ereport(ERROR,
1710 errmsg("password_required=false is superuser-only"),
1711 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1712
1713 /* Lock the subscription so nobody else can do anything with it. */
1715
1716 /* Form a new tuple. */
1717 memset(values, 0, sizeof(values));
1718 memset(nulls, false, sizeof(nulls));
1719 memset(replaces, false, sizeof(replaces));
1720
1722
1723 switch (stmt->kind)
1724 {
1726 {
1727 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1728 {
1729 /*
1730 * The subscription must be disabled to allow slot_name as
1731 * 'none', otherwise, the apply worker will repeatedly try
1732 * to stream the data using that slot_name which neither
1733 * exists on the publisher nor the user will be allowed to
1734 * create it.
1735 */
1736 if (sub->enabled && !opts.slot_name)
1737 ereport(ERROR,
1739 errmsg("cannot set %s for enabled subscription",
1740 "slot_name = NONE")));
1741
1742 if (opts.slot_name)
1745 else
1746 nulls[Anum_pg_subscription_subslotname - 1] = true;
1748 }
1749
1750 if (opts.synchronous_commit)
1751 {
1753 CStringGetTextDatum(opts.synchronous_commit);
1755 }
1756
1757 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1758 {
1760 BoolGetDatum(opts.binary);
1762 }
1763
1764 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1765 {
1767 CharGetDatum(opts.streaming);
1769 }
1770
1771 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1772 {
1774 = BoolGetDatum(opts.disableonerr);
1776 = true;
1777 }
1778
1779 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1780 {
1781 /* Non-superuser may not disable password_required. */
1782 if (!opts.passwordrequired && !superuser())
1783 ereport(ERROR,
1785 errmsg("password_required=false is superuser-only"),
1786 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1787
1789 = BoolGetDatum(opts.passwordrequired);
1791 = true;
1792 }
1793
1794 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1795 {
1797 BoolGetDatum(opts.runasowner);
1799 }
1800
1801 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1802 {
1803 /*
1804 * We need to update both the slot and the subscription
1805 * for the two_phase option. We can enable the two_phase
1806 * option for a slot only once the initial data
1807 * synchronization is done. This is to avoid missing some
1808 * data as explained in comments atop worker.c.
1809 */
1810 update_two_phase = !opts.twophase;
1811
1812 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1813 isTopLevel);
1814
1815 /*
1816 * Modifying the two_phase slot option requires a slot
1817 * lookup by slot name, so changing the slot name at the
1818 * same time is not allowed.
1819 */
1820 if (update_two_phase &&
1821 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1822 ereport(ERROR,
1824 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1825
1826 /*
1827 * Note that workers may still survive even if the
1828 * subscription has been disabled.
1829 *
1830 * Ensure workers have already been exited to avoid
1831 * getting prepared transactions while we are disabling
1832 * the two_phase option. Otherwise, the changes of an
1833 * already prepared transaction can be replicated again
1834 * along with its corresponding commit, leading to
1835 * duplicate data or errors.
1836 */
1837 if (logicalrep_workers_find(subid, true, true))
1838 ereport(ERROR,
1840 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1841 errhint("Try again after some time.")));
1842
1843 /*
1844 * two_phase cannot be disabled if there are any
1845 * uncommitted prepared transactions present otherwise it
1846 * can lead to duplicate data or errors as explained in
1847 * the comment above.
1848 */
1849 if (update_two_phase &&
1851 LookupGXactBySubid(subid))
1852 ereport(ERROR,
1854 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1855 errhint("Resolve these transactions and try again.")));
1856
1857 /* Change system catalog accordingly */
1859 CharGetDatum(opts.twophase ?
1863 }
1864
1865 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1866 {
1867 /*
1868 * Similar to the two_phase case above, we need to update
1869 * the failover option for both the slot and the
1870 * subscription.
1871 */
1872 update_failover = true;
1873
1874 CheckAlterSubOption(sub, "failover", update_failover,
1875 isTopLevel);
1876
1878 BoolGetDatum(opts.failover);
1880 }
1881
1882 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1883 {
1885 BoolGetDatum(opts.retaindeadtuples);
1887
1888 /*
1889 * Update the retention status only if there's a change in
1890 * the retain_dead_tuples option value.
1891 *
1892 * Automatically marking retention as active when
1893 * retain_dead_tuples is enabled may not always be ideal,
1894 * especially if retention was previously stopped and the
1895 * user toggles retain_dead_tuples without adjusting the
1896 * publisher workload. However, this behavior provides a
1897 * convenient way for users to manually refresh the
1898 * retention status. Since retention will be stopped again
1899 * unless the publisher workload is reduced, this approach
1900 * is acceptable for now.
1901 */
1902 if (opts.retaindeadtuples != sub->retaindeadtuples)
1903 {
1905 BoolGetDatum(opts.retaindeadtuples);
1907
1908 retention_active = opts.retaindeadtuples;
1909 }
1910
1911 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1912
1913 /*
1914 * Workers may continue running even after the
1915 * subscription has been disabled.
1916 *
1917 * To prevent race conditions (as described in
1918 * CheckAlterSubOption()), ensure that all worker
1919 * processes have already exited before proceeding.
1920 */
1921 if (logicalrep_workers_find(subid, true, true))
1922 ereport(ERROR,
1924 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1925 errhint("Try again after some time.")));
1926
1927 /*
1928 * Notify the launcher to manage the replication slot for
1929 * conflict detection. This ensures that replication slot
1930 * is efficiently handled (created, updated, or dropped)
1931 * in response to any configuration changes.
1932 */
1934
1935 check_pub_rdt = opts.retaindeadtuples;
1936 retain_dead_tuples = opts.retaindeadtuples;
1937 }
1938
1939 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1940 {
1942 Int32GetDatum(opts.maxretention);
1944
1945 max_retention = opts.maxretention;
1946 }
1947
1948 /*
1949 * Ensure that system configuration parameters are set
1950 * appropriately to support retain_dead_tuples and
1951 * max_retention_duration.
1952 */
1953 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1954 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1958 (max_retention > 0));
1959
1960 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1961 {
1963 CStringGetTextDatum(opts.origin);
1965
1966 /*
1967 * Check if changes from different origins may be received
1968 * from the publisher when the origin is changed to ANY
1969 * and retain_dead_tuples is enabled. Use |= so that we
1970 * don't clear the flag already set when
1971 * retain_dead_tuples was changed in the same command.
1972 */
1975
1976 origin = opts.origin;
1977 }
1978
1979 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1980 {
1982 CStringGetTextDatum(opts.wal_receiver_timeout);
1984 }
1985
1986 if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST))
1987 {
1990
1991 if (opts.conflictlogdest != old_dest)
1992 {
1993 bool update_relid;
1994 Oid relid = InvalidOid;
1995
1999
2001 old_dest,
2002 opts.conflictlogdest,
2003 &relid);
2004 if (update_relid)
2005 {
2007 ObjectIdGetDatum(relid);
2009 true;
2010 }
2011 }
2012 }
2013
2014 update_tuple = true;
2015 break;
2016 }
2017
2019 {
2020 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
2021
2022 if (!sub->slotname && opts.enabled)
2023 ereport(ERROR,
2025 errmsg("cannot enable subscription that does not have a slot name")));
2026
2027 /*
2028 * Check track_commit_timestamp only when enabling the
2029 * subscription in case it was disabled after creation. See
2030 * comments atop CheckSubDeadTupleRetention() for details.
2031 */
2032 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
2034 sub->retentionactive, false);
2035
2037 BoolGetDatum(opts.enabled);
2039
2040 if (opts.enabled)
2042
2043 update_tuple = true;
2044
2045 /*
2046 * The subscription might be initially created with
2047 * connect=false and retain_dead_tuples=true, meaning the
2048 * remote server's status may not be checked. Ensure this
2049 * check is conducted now.
2050 */
2051 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
2052 break;
2053 }
2054
2056 {
2060
2061 /*
2062 * Remove what was there before, either another foreign server
2063 * or a connection string.
2064 */
2065 if (form->subserver)
2066 {
2069 ForeignServerRelationId, form->subserver);
2070 }
2071 else
2072 {
2073 nulls[Anum_pg_subscription_subconninfo - 1] = true;
2075 }
2076
2077 /*
2078 * Check that the subscription owner has USAGE privileges on
2079 * the server.
2080 */
2081 new_server = GetForeignServerByName(stmt->servername, false);
2083 new_server->serverid,
2084 form->subowner, ACL_USAGE);
2085 if (aclresult != ACLCHECK_OK)
2086 ereport(ERROR,
2088 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2089 GetUserNameFromId(form->subowner, false),
2090 new_server->servername));
2091
2092 /* make sure a user mapping exists */
2093 GetUserMapping(form->subowner, new_server->serverid);
2094
2096 new_server);
2097
2098 /* Load the library providing us libpq calls. */
2099 load_file("libpqwalreceiver", false);
2100 /* Check the connection info string. */
2102 sub->passwordrequired && !sub->ownersuperuser);
2103
2106
2109
2110 update_tuple = true;
2111 }
2112
2113 /*
2114 * Since the remote server configuration might have changed,
2115 * perform a check to ensure it permits enabling
2116 * retain_dead_tuples.
2117 */
2119 break;
2120
2122 /* remove reference to foreign server and dependencies, if present */
2123 if (form->subserver)
2124 {
2127 ForeignServerRelationId, form->subserver);
2128
2131 }
2132
2133 new_conninfo = stmt->conninfo;
2134
2135 /* Load the library providing us libpq calls. */
2136 load_file("libpqwalreceiver", false);
2137 /* Check the connection info string. */
2139 sub->passwordrequired && !sub->ownersuperuser);
2140
2142 CStringGetTextDatum(stmt->conninfo);
2144 update_tuple = true;
2145
2146 /*
2147 * Since the remote server configuration might have changed,
2148 * perform a check to ensure it permits enabling
2149 * retain_dead_tuples.
2150 */
2152 break;
2153
2155 {
2157 publicationListToArray(stmt->publication);
2159
2160 update_tuple = true;
2161
2162 /* Refresh if user asked us to. */
2163 if (opts.refresh)
2164 {
2165 if (!sub->enabled)
2166 ereport(ERROR,
2168 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2169 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
2170
2171 /*
2172 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
2173 * why this is not allowed.
2174 */
2175 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2176 ereport(ERROR,
2178 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2179 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2180
2181 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2182
2183 /* Make sure refresh sees the new list of publications. */
2184 sub->publications = stmt->publication;
2185
2186 AlterSubscription_refresh(sub, opts.copy_data,
2187 stmt->publication);
2188 }
2189
2190 break;
2191 }
2192
2195 {
2196 List *publist;
2198
2199 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
2203
2204 update_tuple = true;
2205
2206 /* Refresh if user asked us to. */
2207 if (opts.refresh)
2208 {
2209 /* We only need to validate user specified publications. */
2210 List *validate_publications = (isadd) ? stmt->publication : NULL;
2211
2212 if (!sub->enabled)
2213 ereport(ERROR,
2215 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2216 /* translator: %s is an SQL ALTER command */
2217 errhint("Use %s instead.",
2218 isadd ?
2219 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
2220 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
2221
2222 /*
2223 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
2224 * why this is not allowed.
2225 */
2226 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2227 ereport(ERROR,
2229 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2230 /* translator: %s is an SQL ALTER command */
2231 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
2232 isadd ?
2233 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
2234 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
2235
2236 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2237
2238 /* Refresh the new list of publications. */
2239 sub->publications = publist;
2240
2241 AlterSubscription_refresh(sub, opts.copy_data,
2243 }
2244
2245 break;
2246 }
2247
2249 {
2250 if (!sub->enabled)
2251 ereport(ERROR,
2253 errmsg("%s is not allowed for disabled subscriptions",
2254 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2255
2256 /*
2257 * The subscription option "two_phase" requires that
2258 * replication has passed the initial table synchronization
2259 * phase before the two_phase becomes properly enabled.
2260 *
2261 * But, having reached this two-phase commit "enabled" state
2262 * we must not allow any subsequent table initialization to
2263 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2264 * disallowed when the user had requested two_phase = on mode.
2265 *
2266 * The exception to this restriction is when copy_data =
2267 * false, because when copy_data is false the tablesync will
2268 * start already in READY state and will exit directly without
2269 * doing anything.
2270 *
2271 * For more details see comments atop worker.c.
2272 */
2273 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2274 ereport(ERROR,
2276 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2277 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2278
2279 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2280
2281 AlterSubscription_refresh(sub, opts.copy_data, NULL);
2282
2283 break;
2284 }
2285
2287 {
2288 if (!sub->enabled)
2289 ereport(ERROR,
2291 errmsg("%s is not allowed for disabled subscriptions",
2292 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2293
2295
2296 break;
2297 }
2298
2300 {
2301 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
2302 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2303
2304 /*
2305 * If the user sets subskiplsn, we do a sanity check to make
2306 * sure that the specified LSN is a probable value.
2307 */
2308 if (XLogRecPtrIsValid(opts.lsn))
2309 {
2311 char originname[NAMEDATALEN];
2312 XLogRecPtr remote_lsn;
2313
2315 originname, sizeof(originname));
2317 remote_lsn = replorigin_get_progress(originid, false);
2318
2319 /* Check the given LSN is at least a future LSN */
2320 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
2321 ereport(ERROR,
2323 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2324 LSN_FORMAT_ARGS(opts.lsn),
2325 LSN_FORMAT_ARGS(remote_lsn))));
2326 }
2327
2330
2331 update_tuple = true;
2332 break;
2333 }
2334
2335 default:
2336 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2337 stmt->kind);
2338 }
2339
2340 /* Update the catalog if needed. */
2341 if (update_tuple)
2342 {
2344 replaces);
2345
2346 CatalogTupleUpdate(rel, &tup->t_self, tup);
2347
2349 }
2350
2351 /*
2352 * Try to acquire the connection necessary either for modifying the slot
2353 * or for checking if the remote server permits enabling
2354 * retain_dead_tuples.
2355 *
2356 * This has to be at the end because otherwise if there is an error while
2357 * doing the database operations we won't be able to rollback altered
2358 * slot.
2359 */
2361 {
2362 bool must_use_password;
2363 char *err;
2365
2367
2368 /* Load the library providing us libpq calls. */
2369 load_file("libpqwalreceiver", false);
2370
2371 /*
2372 * Try to connect to the publisher, using the new connection string if
2373 * available.
2374 */
2376 wrconn = walrcv_connect(new_conninfo ? new_conninfo : sub->conninfo,
2378 &err);
2379 if (!wrconn)
2380 ereport(ERROR,
2382 errmsg("subscription \"%s\" could not connect to the publisher: %s",
2383 sub->name, err)));
2384
2385 PG_TRY();
2386 {
2389
2391 retain_dead_tuples, origin, NULL, 0,
2392 sub->name);
2393
2396 update_failover ? &opts.failover : NULL,
2397 update_two_phase ? &opts.twophase : NULL);
2398 }
2399 PG_FINALLY();
2400 {
2402 }
2403 PG_END_TRY();
2404 }
2405
2407
2409
2410 /* Wake up related replication workers to handle this change quickly. */
2412
2413 return myself;
2414}
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
@ ACLCHECK_NOT_OWNER
Definition acl.h:186
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3902
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4156
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6334
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Assert(condition)
Definition c.h:1002
uint32_t uint32
Definition c.h:683
const char *const ConflictLogDestNames[]
Definition conflict.c:34
ConflictLogDest GetConflictLogDest(const char *dest)
Definition conflict.c:210
ConflictLogDest
Definition conflict.h:90
@ DEPENDENCY_NORMAL
Definition dependency.h:33
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode(int sqlerrcode)
Definition elog.c:875
int errhint(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
#define PG_END_TRY(...)
Definition elog.h:399
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define NOTICE
Definition elog.h:36
#define PG_FINALLY(...)
Definition elog.h:391
#define ereport(elevel,...)
Definition elog.h:152
void err(int eval, const char *fmt,...)
Definition err.c:43
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:688
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:185
char * ForeignServerConnectionString(Oid userid, ForeignServer *server)
Definition foreign.c:202
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:232
Oid MyDatabaseId
Definition globals.c:96
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
return true
Definition isn.c:130
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1185
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define AccessExclusiveLock
Definition lockdefs.h:43
#define RowExclusiveLock
Definition lockdefs.h:38
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
static char * errmsg
#define InvokeObjectPostAlterHook(classId, objectId, subId)
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SERVER
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
#define ACL_USAGE
Definition parsenodes.h:84
@ OBJECT_SUBSCRIPTION
static AmcheckOptions opts
Definition pg_amcheck.c:112
#define NAMEDATALEN
long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, Oid refclassId, Oid refobjectId)
Definition pg_depend.c:411
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
Subscription * GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed, bool conninfo_aclcheck)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
int pg_strcasecmp(const char *s1, const char *s2)
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static Datum CStringGetDatum(const char *X)
Definition postgres.h:383
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define RelationGetDescr(relation)
Definition rel.h:542
Definition pg_list.h:54
#define SUBOPT_STREAMING
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
static bool alter_sub_conflict_log_dest(Subscription *sub, ConflictLogDest oldlogdest, ConflictLogDest newlogdest, Oid *conflicttablerelid)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
#define SUBOPT_DISABLE_ON_ERR
#define SUBOPT_CONFLICT_LOG_DEST
static void parse_subscription_options(ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
bool superuser(void)
Definition superuser.c:47
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2803
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:95
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3701
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21

References AccessExclusiveLock, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, alter_sub_conflict_log_dest(), ALTER_SUBSCRIPTION_ADD_PUBLICATION, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_DROP_PUBLICATION, ALTER_SUBSCRIPTION_ENABLED, ALTER_SUBSCRIPTION_OPTIONS, ALTER_SUBSCRIPTION_REFRESH_PUBLICATION, ALTER_SUBSCRIPTION_REFRESH_SEQUENCES, ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_SKIP, AlterSubscription_refresh(), AlterSubscription_refresh_seq(), ApplyLauncherWakeupAtCommit(), Assert, BoolGetDatum(), CatalogTupleUpdate(), CharGetDatum(), check_pub_dead_tuple_retention(), check_publications_origin_tables(), CheckAlterSubOption(), CheckSubDeadTupleRetention(), Subscription::conflictlogdest, ConflictLogDestNames, Subscription::conninfo, CStringGetDatum(), CStringGetTextDatum, deleteDependencyRecordsForSpecific(), DEPENDENCY_NORMAL, DirectFunctionCall1, elog, Subscription::enabled, ereport, err(), errcode(), errhint(), errmsg, ERROR, fb(), ForeignServerConnectionString(), Form_pg_subscription, GetConflictLogDest(), GetForeignServerByName(), GETSTRUCT(), GetSubscription(), GetUserId(), GetUserMapping(), GetUserNameFromId(), heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, Int32GetDatum(), InvalidOid, InvokeObjectPostAlterHook, IsSet, load_file(), LockSharedObject(), logicalrep_workers_find(), LogicalRepWorkersWakeupAtCommit(), LookupGXactBySubid(), LSN_FORMAT_ARGS, LSNGetDatum(), Subscription::maxretention, merge_publications(), MyDatabaseId, Subscription::name, NAMEDATALEN, namein(), NOTICE, object_aclcheck(), object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), opts, Subscription::origin, Subscription::ownersuperuser, parse_subscription_options(), Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, pg_strcasecmp(), PG_TRY, PreventInTransactionBlock(), publicationListToArray(), Subscription::publications, recordDependencyOn(), RelationGetDescr, ReplicationOriginNameForLogicalRep(), replorigin_by_name(), replorigin_get_progress(), Subscription::retaindeadtuples, Subscription::retentionactive, RowExclusiveLock, SearchSysCacheCopy2, Subscription::slotname, stmt, SUBOPT_BINARY, SUBOPT_CONFLICT_LOG_DEST, SUBOPT_COPY_DATA, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, superuser(), table_close(), table_open(), Subscription::twophasestate, values, walrcv_alter_slot, walrcv_check_conninfo, walrcv_connect, walrcv_disconnect, WARNING, wrconn, and XLogRecPtrIsValid.

Referenced by ProcessUtilitySlow().

◆ AlterSubscription_refresh()

static void AlterSubscription_refresh ( Subscription sub,
bool  copy_data,
List validate_publications 
)
static

Definition at line 1089 of file subscriptioncmds.c.

1091{
1092 char *err;
1093 List *pubrels = NIL;
1099 int subrel_count;
1100 ListCell *lc;
1101 int off;
1102 int tbl_count = 0;
1103 int seq_count = 0;
1104 Relation rel = NULL;
1105 typedef struct SubRemoveRels
1106 {
1107 Oid relid;
1108 char state;
1109 } SubRemoveRels;
1110
1112 bool must_use_password;
1113
1114 /* Load the library providing us libpq calls. */
1115 load_file("libpqwalreceiver", false);
1116
1117 /* Try to connect to the publisher. */
1119 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1120 sub->name, &err);
1121 if (!wrconn)
1122 ereport(ERROR,
1124 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1125 sub->name, err)));
1126
1127 PG_TRY();
1128 {
1131
1132 /* Get the relation list from publisher. */
1134
1135 /* Get local relation list. */
1136 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
1138
1139 /*
1140 * Build qsorted arrays of local table oids and sequence oids for
1141 * faster lookup. This can potentially contain all tables and
1142 * sequences in the database so speed of lookup is important.
1143 *
1144 * We do not yet know the exact count of tables and sequences, so we
1145 * allocate separate arrays for table OIDs and sequence OIDs based on
1146 * the total number of relations (subrel_count).
1147 */
1150 foreach(lc, subrel_states)
1151 {
1153
1154 if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
1155 subseq_local_oids[seq_count++] = relstate->relid;
1156 else
1157 subrel_local_oids[tbl_count++] = relstate->relid;
1158 }
1159
1162 sub->retaindeadtuples, sub->origin,
1164 sub->name);
1165
1168 copy_data, sub->origin,
1170 sub->name);
1171
1172 /*
1173 * Walk over the remote relations and try to match them to locally
1174 * known relations. If the relation is not known locally create a new
1175 * state for it.
1176 *
1177 * Also builds array of local oids of remote relations for the next
1178 * step.
1179 */
1180 off = 0;
1182
1184 {
1185 RangeVar *rv = pubrelinfo->rv;
1186 Oid relid;
1187 char relkind;
1188
1189 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1190 relkind = get_rel_relkind(relid);
1191
1192 /* Check for supported relkind. */
1193 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1194 rv->schemaname, rv->relname);
1195
1196 pubrel_local_oids[off++] = relid;
1197
1198 if (!bsearch(&relid, subrel_local_oids,
1199 tbl_count, sizeof(Oid), oid_cmp) &&
1200 !bsearch(&relid, subseq_local_oids,
1201 seq_count, sizeof(Oid), oid_cmp))
1202 {
1203 AddSubscriptionRelState(sub->oid, relid,
1207 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1208 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1209 rv->schemaname, rv->relname, sub->name));
1210 }
1211 }
1212
1213 /*
1214 * Next remove state for tables we should not care about anymore using
1215 * the data we collected above
1216 */
1218
1219 for (off = 0; off < tbl_count; off++)
1220 {
1221 Oid relid = subrel_local_oids[off];
1222
1223 if (!bsearch(&relid, pubrel_local_oids,
1224 list_length(pubrels), sizeof(Oid), oid_cmp))
1225 {
1226 char state;
1227 XLogRecPtr statelsn;
1229
1230 /*
1231 * Lock pg_subscription_rel with AccessExclusiveLock to
1232 * prevent any race conditions with the apply worker
1233 * re-launching workers at the same time this code is trying
1234 * to remove those tables.
1235 *
1236 * Even if new worker for this particular rel is restarted it
1237 * won't be able to make any progress as we hold exclusive
1238 * lock on pg_subscription_rel till the transaction end. It
1239 * will simply exit as there is no corresponding rel entry.
1240 *
1241 * This locking also ensures that the state of rels won't
1242 * change till we are done with this refresh operation.
1243 */
1244 if (!rel)
1246
1247 /* Last known rel state. */
1248 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1249
1250 RemoveSubscriptionRel(sub->oid, relid);
1251
1252 remove_rel->relid = relid;
1253 remove_rel->state = state;
1254
1256
1258
1259 /*
1260 * For READY state, we would have already dropped the
1261 * tablesync origin.
1262 */
1264 {
1265 char originname[NAMEDATALEN];
1266
1267 /*
1268 * Drop the tablesync's origin tracking if exists.
1269 *
1270 * It is possible that the origin is not yet created for
1271 * tablesync worker, this can happen for the states before
1272 * SUBREL_STATE_DATASYNC. The tablesync worker or apply
1273 * worker can also concurrently try to drop the origin and
1274 * by this time the origin might be already removed. For
1275 * these reasons, passing missing_ok = true.
1276 */
1278 sizeof(originname));
1279 replorigin_drop_by_name(originname, true, false);
1280 }
1281
1283 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1285 get_rel_name(relid),
1286 sub->name)));
1287 }
1288 }
1289
1290 /*
1291 * Drop the tablesync slots associated with removed tables. This has
1292 * to be at the end because otherwise if there is an error while doing
1293 * the database operations we won't be able to rollback dropped slots.
1294 */
1296 {
1297 if (sub_remove_rel->state != SUBREL_STATE_READY &&
1299 {
1300 char syncslotname[NAMEDATALEN] = {0};
1301
1302 /*
1303 * For READY/SYNCDONE states we know the tablesync slot has
1304 * already been dropped by the tablesync worker.
1305 *
1306 * For other states, there is no certainty, maybe the slot
1307 * does not exist yet. Also, if we fail after removing some of
1308 * the slots, next time, it will again try to drop already
1309 * dropped slots and fail. For these reasons, we allow
1310 * missing_ok = true for the drop.
1311 */
1313 syncslotname, sizeof(syncslotname));
1315 }
1316 }
1317
1318 /*
1319 * Next remove state for sequences we should not care about anymore
1320 * using the data we collected above
1321 */
1322 for (off = 0; off < seq_count; off++)
1323 {
1324 Oid relid = subseq_local_oids[off];
1325
1326 if (!bsearch(&relid, pubrel_local_oids,
1327 list_length(pubrels), sizeof(Oid), oid_cmp))
1328 {
1329 /*
1330 * This locking ensures that the state of rels won't change
1331 * till we are done with this refresh operation.
1332 */
1333 if (!rel)
1335
1336 RemoveSubscriptionRel(sub->oid, relid);
1337
1339 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1341 get_rel_name(relid),
1342 sub->name));
1343 }
1344 }
1345 }
1346 PG_FINALLY();
1347 {
1349 }
1350 PG_END_TRY();
1351
1352 if (rel)
1353 table_close(rel, NoLock);
1354}
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:31
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
#define palloc_object(type)
Definition fe_memutils.h:89
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:662
List * lappend(List *list, void *datum)
Definition list.c:339
#define NoLock
Definition lockdefs.h:34
#define AccessShareLock
Definition lockdefs.h:36
char * get_rel_name(Oid relid)
Definition lsyscache.c:2234
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2309
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2258
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3674
void * palloc(Size size)
Definition mcxt.c:1390
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
int oid_cmp(const void *p1, const void *p2)
Definition oid.c:287
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
NameData relname
Definition pg_class.h:40
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
#define qsort(a, b, c, d)
Definition port.h:496
char * relname
Definition primnodes.h:84
char * schemaname
Definition primnodes.h:81
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1236
@ WORKERTYPE_TABLESYNC
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References AccessExclusiveLock, AccessShareLock, AddSubscriptionRelState(), check_publications(), check_publications_origin_sequences(), check_publications_origin_tables(), CheckSubscriptionRelkind(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg, errmsg_internal(), ERROR, fb(), fetch_relation_list(), foreach_ptr, get_namespace_name(), get_rel_name(), get_rel_namespace(), get_rel_relkind(), GetSubscriptionRelations(), GetSubscriptionRelState(), InvalidXLogRecPtr, lappend(), lfirst, list_length(), load_file(), logicalrep_worker_stop(), Subscription::name, NAMEDATALEN, NIL, NoLock, Subscription::oid, oid_cmp(), Subscription::origin, Subscription::ownersuperuser, palloc(), palloc_object, Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, PG_TRY, Subscription::publications, qsort, RangeVarGetRelid, SubscriptionRelState::relid, RangeVar::relname, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), Subscription::retaindeadtuples, RangeVar::schemaname, table_close(), table_open(), walrcv_connect, walrcv_disconnect, WORKERTYPE_TABLESYNC, and wrconn.

Referenced by AlterSubscription().

◆ AlterSubscription_refresh_seq()

static void AlterSubscription_refresh_seq ( Subscription sub)
static

Definition at line 1360 of file subscriptioncmds.c.

1361{
1362 char *err = NULL;
1364 bool must_use_password;
1365
1366 /* Load the library providing us libpq calls. */
1367 load_file("libpqwalreceiver", false);
1368
1369 /* Try to connect to the publisher. */
1371 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1372 sub->name, &err);
1373 if (!wrconn)
1374 ereport(ERROR,
1376 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1377 sub->name, err));
1378
1379 PG_TRY();
1380 {
1382
1384 sub->origin, NULL, 0, sub->name);
1385
1386 /* Get local sequence list. */
1387 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1389 {
1390 Oid relid = subrel->relid;
1391
1393 InvalidXLogRecPtr, false);
1395 errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1397 get_rel_name(relid),
1398 sub->name));
1399 }
1400 }
1401 PG_FINALLY();
1402 {
1404 }
1405 PG_END_TRY();
1406}
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)

References check_publications_origin_sequences(), Subscription::conninfo, DEBUG1, ereport, err(), errcode(), errmsg, errmsg_internal(), ERROR, fb(), foreach_ptr, get_namespace_name(), get_rel_name(), get_rel_namespace(), GetSubscriptionRelations(), InvalidXLogRecPtr, load_file(), Subscription::name, Subscription::oid, Subscription::origin, Subscription::ownersuperuser, Subscription::passwordrequired, PG_END_TRY, PG_FINALLY, PG_TRY, Subscription::publications, UpdateSubscriptionRelState(), walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by AlterSubscription().

◆ AlterSubscriptionOwner()

ObjectAddress AlterSubscriptionOwner ( const char name,
Oid  newOwnerId 
)

Definition at line 2950 of file subscriptioncmds.c.

2951{
2952 Oid subid;
2953 HeapTuple tup;
2954 Relation rel;
2955 ObjectAddress address;
2957
2959
2962
2963 if (!HeapTupleIsValid(tup))
2964 ereport(ERROR,
2966 errmsg("subscription \"%s\" does not exist", name)));
2967
2969 subid = form->oid;
2970
2972
2974
2976
2978
2979 return address;
2980}
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)

References AlterSubscriptionOwner_internal(), CStringGetDatum(), ereport, errcode(), errmsg, ERROR, fb(), Form_pg_subscription, GETSTRUCT(), heap_freetuple(), HeapTupleIsValid, MyDatabaseId, name, ObjectAddressSet, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy2, table_close(), and table_open().

Referenced by ExecAlterOwnerStmt().

◆ AlterSubscriptionOwner_internal()

static void AlterSubscriptionOwner_internal ( Relation  rel,
HeapTuple  tup,
Oid  newOwnerId 
)
static

Definition at line 2864 of file subscriptioncmds.c.

2865{
2868
2870
2871 if (form->subowner == newOwnerId)
2872 return;
2873
2876 NameStr(form->subname));
2877
2878 /*
2879 * Don't allow non-superuser modification of a subscription with
2880 * password_required=false.
2881 */
2882 if (!form->subpasswordrequired && !superuser())
2883 ereport(ERROR,
2885 errmsg("password_required=false is superuser-only"),
2886 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2887
2888 /* Must be able to become new owner */
2890
2891 /*
2892 * current owner must have CREATE on database
2893 *
2894 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2895 * other object types behave differently (e.g. you can't give a table to a
2896 * user who lacks CREATE privileges on a schema).
2897 */
2900 if (aclresult != ACLCHECK_OK)
2903
2904 /*
2905 * If the subscription uses a server, check that the new owner has USAGE
2906 * privileges on the server and that a user mapping exists. Note: does not
2907 * re-check the resulting connection string.
2908 */
2909 if (OidIsValid(form->subserver))
2910 {
2911 ForeignServer *server = GetForeignServer(form->subserver);
2912
2914 if (aclresult != ACLCHECK_OK)
2915 ereport(ERROR,
2917 errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2919 server->servername));
2920
2921 /* make sure a user mapping exists */
2923 }
2924
2925 form->subowner = newOwnerId;
2926 CatalogTupleUpdate(rel, &tup->t_self, tup);
2927
2928 /* Update owner of the conflict log table if it exists. */
2929 if (OidIsValid(form->subconflictlogrelid))
2930 ATExecChangeOwner(form->subconflictlogrelid, newOwnerId, true,
2932
2933 /* Update owner dependency reference */
2935 form->oid,
2936 newOwnerId);
2937
2939 form->oid, 0);
2940
2941 /* Wake up related background processes to handle this change quickly. */
2944}
void check_can_set_role(Oid member, Oid role)
Definition acl.c:5371
#define NameStr(name)
Definition c.h:894
#define OidIsValid(objectId)
Definition c.h:917
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
char * get_database_name(Oid dbid)
Definition lsyscache.c:1384
@ OBJECT_DATABASE
#define ACL_CREATE
Definition parsenodes.h:85
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
char * servername
Definition foreign.h:40
void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lockmode)

References AccessExclusiveLock, ACL_CREATE, ACL_USAGE, aclcheck_error(), ACLCHECK_NOT_OWNER, ACLCHECK_OK, ApplyLauncherWakeupAtCommit(), ATExecChangeOwner(), CatalogTupleUpdate(), changeDependencyOnOwner(), check_can_set_role(), ereport, errcode(), errhint(), errmsg, ERROR, fb(), Form_pg_subscription, get_database_name(), GetForeignServer(), GETSTRUCT(), GetUserId(), GetUserMapping(), GetUserNameFromId(), InvokeObjectPostAlterHook, LogicalRepWorkersWakeupAtCommit(), MyDatabaseId, NameStr, object_aclcheck(), OBJECT_DATABASE, object_ownercheck(), OBJECT_SUBSCRIPTION, OidIsValid, ForeignServer::serverid, ForeignServer::servername, and superuser().

Referenced by AlterSubscriptionOwner(), and AlterSubscriptionOwner_oid().

◆ AlterSubscriptionOwner_oid()

void AlterSubscriptionOwner_oid ( Oid  subid,
Oid  newOwnerId 
)

Definition at line 2986 of file subscriptioncmds.c.

2987{
2988 HeapTuple tup;
2989 Relation rel;
2990
2992
2994
2995 if (!HeapTupleIsValid(tup))
2996 ereport(ERROR,
2998 errmsg("subscription with OID %u does not exist", subid)));
2999
3001
3003
3005}
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91

References AlterSubscriptionOwner_internal(), ereport, errcode(), errmsg, ERROR, fb(), heap_freetuple(), HeapTupleIsValid, ObjectIdGetDatum(), RowExclusiveLock, SearchSysCacheCopy1, table_close(), and table_open().

Referenced by shdepReassignOwned_Owner().

◆ appendQuotedString()

static void appendQuotedString ( StringInfo  buf,
const char str,
char  quote 
)
static

Definition at line 553 of file subscriptioncmds.c.

554{
556 while (*str)
557 {
558 char c = *str++;
559
560 if (c == quote)
563 }
565}
const char * str
static char buf[DEFAULT_XLOG_SEG_SIZE]
char * c
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242

References appendStringInfoChar(), buf, and str.

◆ check_duplicates_in_publist()

static void check_duplicates_in_publist ( List publist,
Datum datums 
)
static

Definition at line 3589 of file subscriptioncmds.c.

3590{
3591 ListCell *cell;
3592 int j = 0;
3593
3594 foreach(cell, publist)
3595 {
3596 char *name = strVal(lfirst(cell));
3597 ListCell *pcell;
3598
3599 foreach(pcell, publist)
3600 {
3601 char *pname = strVal(lfirst(pcell));
3602
3603 if (pcell == cell)
3604 break;
3605
3606 if (strcmp(name, pname) == 0)
3607 ereport(ERROR,
3609 errmsg("publication name \"%s\" used more than once",
3610 pname)));
3611 }
3612
3613 if (datums)
3614 datums[j++] = CStringGetTextDatum(name);
3615 }
3616}
int j
Definition isn.c:78
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
#define strVal(v)
Definition value.h:82

References CStringGetTextDatum, ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg, ERROR, fb(), j, lfirst, name, and strVal.

Referenced by merge_publications().

◆ check_pub_dead_tuple_retention()

static void check_pub_dead_tuple_retention ( WalReceiverConn wrconn)
static

Definition at line 3296 of file subscriptioncmds.c.

3297{
3298 WalRcvExecResult *res;
3299 Oid RecoveryRow[1] = {BOOLOID};
3300 TupleTableSlot *slot;
3301 bool isnull;
3302 bool remote_in_recovery;
3303
3304 if (walrcv_server_version(wrconn) < 190000)
3305 ereport(ERROR,
3307 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
3308
3309 res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
3310
3311 if (res->status != WALRCV_OK_TUPLES)
3312 ereport(ERROR,
3314 errmsg("could not obtain recovery progress from the publisher: %s",
3315 res->err)));
3316
3318 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3319 elog(ERROR, "failed to fetch tuple for the recovery progress");
3320
3321 remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
3322
3324 ereport(ERROR,
3326 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
3327
3329
3331}
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
static bool DatumGetBool(Datum X)
Definition postgres.h:100
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:417
@ WALRCV_OK_TUPLES
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)

References DatumGetBool(), elog, ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, ExecDropSingleTupleTableSlot(), fb(), MakeSingleTupleTableSlot(), slot_getattr(), WalRcvExecResult::status, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, and wrconn.

Referenced by AlterSubscription().

◆ check_publications()

static void check_publications ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 574 of file subscriptioncmds.c.

575{
576 WalRcvExecResult *res;
577 StringInfoData cmd;
578 TupleTableSlot *slot;
580 Oid tableRow[1] = {TEXTOID};
581
582 initStringInfo(&cmd);
583 appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
584 " pg_catalog.pg_publication t WHERE\n"
585 " t.pubname IN (");
586 GetPublicationsStr(publications, &cmd, true);
587 appendStringInfoChar(&cmd, ')');
588
589 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
590 pfree(cmd.data);
591
592 if (res->status != WALRCV_OK_TUPLES)
594 errmsg("could not receive list of publications from the publisher: %s",
595 res->err));
596
597 publicationsCopy = list_copy(publications);
598
599 /* Process publication(s). */
601 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
602 {
603 char *pubname;
604 bool isnull;
605
606 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
607 Assert(!isnull);
608
609 /* Delete the publication present in publisher from the list. */
611 ExecClearTuple(slot);
612 }
613
615
617
619 {
620 /* Prepare the list of non-existent publication(s) for error message. */
622
624
628 errmsg_plural("publication %s does not exist on the publisher",
629 "publications %s do not exist on the publisher",
631 pubnames.data));
632 }
633}
#define TextDatumGetCString(d)
Definition builtins.h:99
int int int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
List * list_delete(List *list, void *datum)
Definition list.c:853
List * list_copy(const List *oldlist)
Definition list.c:1573
void pfree(void *pointer)
Definition mcxt.c:1619
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
String * makeString(char *str)
Definition value.c:63

Referenced by AlterSubscription_refresh().

◆ check_publications_origin_sequences()

static void check_publications_origin_sequences ( WalReceiverConn wrconn,
List publications,
bool  copydata,
char origin,
Oid subrel_local_oids,
int  subrel_count,
char subname 
)
static

Definition at line 3179 of file subscriptioncmds.c.

3183{
3184 WalRcvExecResult *res;
3185 StringInfoData cmd;
3186 TupleTableSlot *slot;
3187 Oid tableRow[1] = {TEXTOID};
3188 List *publist = NIL;
3189
3190 /*
3191 * Enable sequence synchronization checks only when origin is 'none' , to
3192 * ensure that sequence data from other origins is not inadvertently
3193 * copied. This check is necessary if the publisher is running PG19 or
3194 * later, where logical replication sequence synchronization is supported.
3195 */
3196 if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
3197 walrcv_server_version(wrconn) < 190000)
3198 return;
3199
3200 initStringInfo(&cmd);
3202 "SELECT DISTINCT P.pubname AS pubname\n"
3203 "FROM pg_publication P,\n"
3204 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
3205 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
3206 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
3207 "WHERE C.oid = GPS.relid AND P.pubname IN (");
3208
3209 GetPublicationsStr(publications, &cmd, true);
3210 appendStringInfoString(&cmd, ")\n");
3211
3212 /*
3213 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
3214 * subrel_local_oids contains the list of relations that are already
3215 * present on the subscriber. This check should be skipped as these will
3216 * not be re-synced.
3217 */
3218 for (int i = 0; i < subrel_count; i++)
3219 {
3220 Oid relid = subrel_local_oids[i];
3221 char *schemaname = get_namespace_name(get_rel_namespace(relid));
3222 char *seqname = get_rel_name(relid);
3223 char *schemaname_lit = quote_literal_cstr(schemaname);
3224 char *seqname_lit = quote_literal_cstr(seqname);
3225
3226 appendStringInfo(&cmd,
3227 "AND NOT (N.nspname = %s AND C.relname = %s)\n",
3229
3232 }
3233
3234 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
3235 pfree(cmd.data);
3236
3237 if (res->status != WALRCV_OK_TUPLES)
3238 ereport(ERROR,
3240 errmsg("could not receive list of replicated sequences from the publisher: %s",
3241 res->err)));
3242
3243 /* Process publications. */
3245 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3246 {
3247 char *pubname;
3248 bool isnull;
3249
3250 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3251 Assert(!isnull);
3252
3253 ExecClearTuple(slot);
3255 }
3256
3257 /*
3258 * Log a warning if the publisher has subscribed to the same sequence from
3259 * some other publisher. We cannot know the origin of sequences data
3260 * during the initial sync.
3261 */
3262 if (publist)
3263 {
3265
3266 /* Prepare the list of publication(s) for warning message. */
3269
3272 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
3273 subname),
3274 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
3275 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
3277 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
3278 }
3279
3281
3283}
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
int i
Definition isn.c:77
List * list_append_unique(List *list, void *datum)
Definition list.c:1343
NameData subname
char * quote_literal_cstr(const char *rawstr)
Definition quote.c:101
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145

References appendStringInfo(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errdetail_plural(), errhint(), errmsg, ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), fb(), get_namespace_name(), get_rel_name(), get_rel_namespace(), GetPublicationsStr(), i, initStringInfo(), list_append_unique(), list_length(), MakeSingleTupleTableSlot(), makeString(), NIL, pfree(), pg_strcasecmp(), quote_literal_cstr(), slot_getattr(), WalRcvExecResult::status, subname, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, WARNING, and wrconn.

Referenced by AlterSubscription_refresh(), and AlterSubscription_refresh_seq().

◆ check_publications_origin_tables()

static void check_publications_origin_tables ( WalReceiverConn wrconn,
List publications,
bool  copydata,
bool  retain_dead_tuples,
char origin,
Oid subrel_local_oids,
int  subrel_count,
char subname 
)
static

Definition at line 3031 of file subscriptioncmds.c.

3035{
3036 WalRcvExecResult *res;
3037 StringInfoData cmd;
3038 TupleTableSlot *slot;
3039 Oid tableRow[1] = {TEXTOID};
3040 List *publist = NIL;
3041 int i;
3042 bool check_rdt;
3043 bool check_table_sync;
3044 bool origin_none = origin &&
3046
3047 /*
3048 * Enable retain_dead_tuples checks only when origin is set to 'any',
3049 * since with origin='none' only local changes are replicated to the
3050 * subscriber.
3051 */
3053
3054 /*
3055 * Enable table synchronization checks only when origin is 'none', to
3056 * ensure that data from other origins is not inadvertently copied.
3057 */
3059
3060 /* retain_dead_tuples and table sync checks occur separately */
3062
3063 /* Return if no checks are required */
3064 if (!check_rdt && !check_table_sync)
3065 return;
3066
3067 initStringInfo(&cmd);
3069 "SELECT DISTINCT P.pubname AS pubname\n"
3070 "FROM pg_publication P,\n"
3071 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
3072 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
3073 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
3074 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
3075 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
3076 "WHERE C.oid = GPT.relid AND P.pubname IN (");
3077 GetPublicationsStr(publications, &cmd, true);
3078 appendStringInfoString(&cmd, ")\n");
3079
3080 /*
3081 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
3082 * subrel_local_oids contains the list of relation oids that are already
3083 * present on the subscriber. This check should be skipped for these
3084 * tables if checking for table sync scenario. However, when handling the
3085 * retain_dead_tuples scenario, ensure all tables are checked, as some
3086 * existing tables may now include changes from other origins due to newly
3087 * created subscriptions on the publisher.
3088 */
3089 if (check_table_sync)
3090 {
3091 for (i = 0; i < subrel_count; i++)
3092 {
3093 Oid relid = subrel_local_oids[i];
3094 char *schemaname = get_namespace_name(get_rel_namespace(relid));
3095 char *tablename = get_rel_name(relid);
3096 char *schemaname_lit = quote_literal_cstr(schemaname);
3097 char *tablename_lit = quote_literal_cstr(tablename);
3098
3099 appendStringInfo(&cmd, "AND NOT (N.nspname = %s AND C.relname = %s)\n",
3101
3104 }
3105 }
3106
3107 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
3108 pfree(cmd.data);
3109
3110 if (res->status != WALRCV_OK_TUPLES)
3111 ereport(ERROR,
3113 errmsg("could not receive list of replicated tables from the publisher: %s",
3114 res->err)));
3115
3116 /* Process publications. */
3118 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3119 {
3120 char *pubname;
3121 bool isnull;
3122
3123 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3124 Assert(!isnull);
3125
3126 ExecClearTuple(slot);
3128 }
3129
3130 /*
3131 * Log a warning if the publisher has subscribed to the same table from
3132 * some other publisher. We cannot know the origin of data during the
3133 * initial sync. Data origins can be found only from the WAL by looking at
3134 * the origin id.
3135 *
3136 * XXX: For simplicity, we don't check whether the table has any data or
3137 * not. If the table doesn't have any data then we don't need to
3138 * distinguish between data having origin and data not having origin so we
3139 * can avoid logging a warning for table sync scenario.
3140 */
3141 if (publist)
3142 {
3144
3145 /* Prepare the list of publication(s) for warning message. */
3148
3149 if (check_table_sync)
3152 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
3153 subname),
3154 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
3155 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
3157 errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
3158 else
3161 errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
3162 subname),
3163 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
3164 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
3166 errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
3167 }
3168
3170
3172}

References appendStringInfo(), appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errdetail_plural(), errhint(), errmsg, ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), fb(), get_namespace_name(), get_rel_name(), get_rel_namespace(), GetPublicationsStr(), i, initStringInfo(), list_append_unique(), list_length(), MakeSingleTupleTableSlot(), makeString(), NIL, pfree(), pg_strcasecmp(), quote_literal_cstr(), slot_getattr(), WalRcvExecResult::status, subname, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, WARNING, and wrconn.

Referenced by AlterSubscription(), and AlterSubscription_refresh().

◆ CheckAlterSubOption()

static void CheckAlterSubOption ( Subscription sub,
const char option,
bool  slot_needs_update,
bool  isTopLevel 
)
static

Definition at line 1413 of file subscriptioncmds.c.

1415{
1416 Assert(strcmp(option, "failover") == 0 ||
1417 strcmp(option, "two_phase") == 0 ||
1418 strcmp(option, "retain_dead_tuples") == 0);
1419
1420 /*
1421 * Altering the retain_dead_tuples option does not update the slot on the
1422 * publisher.
1423 */
1424 Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1425
1426 /*
1427 * Do not allow changing the option if the subscription is enabled. This
1428 * is because both failover and two_phase options of the slot on the
1429 * publisher cannot be modified if the slot is currently acquired by the
1430 * existing walsender.
1431 *
1432 * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1433 * the publisher by the existing walsender, so we could have allowed that
1434 * even when the subscription is enabled. But we kept this restriction for
1435 * the sake of consistency and simplicity.
1436 *
1437 * Additionally, do not allow changing the retain_dead_tuples option when
1438 * the subscription is enabled to prevent race conditions arising from the
1439 * new option value being acknowledged asynchronously by the launcher and
1440 * apply workers.
1441 *
1442 * Without the restriction, a race condition may arise when a user
1443 * disables and immediately re-enables the retain_dead_tuples option. In
1444 * this case, the launcher might drop the slot upon noticing the disabled
1445 * action, while the apply worker may keep maintaining
1446 * oldest_nonremovable_xid without noticing the option change. During this
1447 * period, a transaction ID wraparound could falsely make this ID appear
1448 * as if it originates from the future w.r.t the transaction ID stored in
1449 * the slot maintained by launcher.
1450 *
1451 * Similarly, if the user enables retain_dead_tuples concurrently with the
1452 * launcher starting the worker, the apply worker may start calculating
1453 * oldest_nonremovable_xid before the launcher notices the enable action.
1454 * Consequently, the launcher may update slot.xmin to a newer value than
1455 * that maintained by the worker. In subsequent cycles, upon integrating
1456 * the worker's oldest_nonremovable_xid, the launcher might detect a
1457 * retreat in the calculated xmin, necessitating additional handling.
1458 *
1459 * XXX To address the above race conditions, we can define
1460 * oldest_nonremovable_xid as FullTransactionId and adds the check to
1461 * disallow retreating the conflict slot's xmin. For now, we kept the
1462 * implementation simple by disallowing change to the retain_dead_tuples,
1463 * but in the future we can change this after some more analysis.
1464 *
1465 * Note that we could restrict only the enabling of retain_dead_tuples to
1466 * avoid the race conditions described above, but we maintain the
1467 * restriction for both enable and disable operations for the sake of
1468 * consistency.
1469 */
1470 if (sub->enabled)
1471 ereport(ERROR,
1473 errmsg("cannot set option \"%s\" for enabled subscription",
1474 option)));
1475
1477 {
1478 StringInfoData cmd;
1479
1480 /*
1481 * A valid slot must be associated with the subscription for us to
1482 * modify any of the slot's properties.
1483 */
1484 if (!sub->slotname)
1485 ereport(ERROR,
1487 errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1488 option)));
1489
1490 /* The changed option of the slot can't be rolled back. */
1491 initStringInfo(&cmd);
1492 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1493
1495 pfree(cmd.data);
1496 }
1497}

References appendStringInfo(), Assert, StringInfoData::data, Subscription::enabled, ereport, errcode(), errmsg, ERROR, fb(), initStringInfo(), pfree(), PreventInTransactionBlock(), and Subscription::slotname.

Referenced by AlterSubscription().

◆ CheckSubDeadTupleRetention()

void CheckSubDeadTupleRetention ( bool  check_guc,
bool  sub_disabled,
int  elevel_for_sub_disabled,
bool  retain_dead_tuples,
bool  retention_active,
bool  max_retention_set 
)

Definition at line 3357 of file subscriptioncmds.c.

3361{
3364
3366 {
3368 ereport(ERROR,
3370 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3371 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3372
3376 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3377 errhint("Consider setting \"%s\" to true.",
3378 "track_commit_timestamp"));
3379
3383 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3385 ? errhint("Consider setting %s to false.",
3386 "retain_dead_tuples") : 0);
3387 }
3388 else if (max_retention_set)
3389 {
3392 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3393 }
3394}
bool track_commit_timestamp
Definition commit_ts.c:121
int wal_level
Definition xlog.c:138
@ WAL_LEVEL_REPLICA
Definition xlog.h:77

References Assert, ereport, errcode(), errhint(), errmsg, ERROR, fb(), NOTICE, track_commit_timestamp, wal_level, WAL_LEVEL_REPLICA, and WARNING.

Referenced by AlterSubscription(), and DisableSubscriptionAndExit().

◆ construct_subserver_conninfo()

static char * construct_subserver_conninfo ( Oid  subserver,
Oid  subowner,
char **  err 
)
static

Definition at line 2425 of file subscriptioncmds.c.

2426{
2428 ForeignServer *server;
2429
2430 *err = NULL;
2431
2432 server = GetForeignServer(subserver);
2433
2436 if (aclresult != ACLCHECK_OK)
2437 {
2438 /*
2439 * Unable to generate connection string because permissions on the
2440 * foreign server have been removed. Follow the same logic as an
2441 * unusable subconninfo (which will result in an ERROR later unless
2442 * slot_name = NONE).
2443 */
2444 *err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2446 server->servername);
2447 return NULL;
2448 }
2449
2451}
#define _(x)
Definition elog.c:96
char * psprintf(const char *fmt,...)
Definition psprintf.c:43

References _, ACL_USAGE, ACLCHECK_OK, err(), fb(), ForeignServerConnectionString(), GetForeignServer(), GetUserNameFromId(), object_aclcheck(), psprintf(), and ForeignServer::servername.

Referenced by DropSubscription().

◆ CreateSubscription()

ObjectAddress CreateSubscription ( ParseState pstate,
CreateSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 669 of file subscriptioncmds.c.

671{
672 Relation rel;
674 Oid subid;
675 bool nulls[Natts_pg_subscription];
677 Oid owner = GetUserId();
679 Oid serverid;
680 char *conninfo;
682 List *publications;
684 SubOpts opts = {0};
687
688 /*
689 * Parse and check options.
690 *
691 * Connection and publication should not be specified here.
692 */
704
705 /*
706 * Since creating a replication slot is not transactional, rolling back
707 * the transaction leaves the created replication slot. So we cannot run
708 * CREATE SUBSCRIPTION inside a transaction block if creating a
709 * replication slot.
710 */
711 if (opts.create_slot)
712 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
713
714 /*
715 * We don't want to allow unprivileged users to be able to trigger
716 * attempts to access arbitrary network destinations, so require the user
717 * to have been specifically authorized to create subscriptions.
718 */
722 errmsg("permission denied to create subscription"),
723 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
724 "pg_create_subscription")));
725
726 /*
727 * Since a subscription is a database object, we also check for CREATE
728 * permission on the database.
729 */
731 owner, ACL_CREATE);
732 if (aclresult != ACLCHECK_OK)
735
736 /*
737 * Non-superusers are required to set a password for authentication, and
738 * that password must be used by the target server, but the superuser can
739 * exempt a subscription from this requirement.
740 */
741 if (!opts.passwordrequired && !superuser_arg(owner))
744 errmsg("password_required=false is superuser-only"),
745 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
746
747 /*
748 * If built with appropriate switch, whine when regression-testing
749 * conventions for subscription names are violated.
750 */
751#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
752 if (strncmp(stmt->subname, "regress_", 8) != 0)
753 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
754#endif
755
757
758 /* Check if name is used */
761 if (OidIsValid(subid))
762 {
765 errmsg("subscription \"%s\" already exists",
766 stmt->subname)));
767 }
768
769 /*
770 * Ensure that system configuration parameters are set appropriately to
771 * support retain_dead_tuples and max_retention_duration.
772 */
774 opts.retaindeadtuples, opts.retaindeadtuples,
775 (opts.maxretention > 0));
776
777 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
778 opts.slot_name == NULL)
779 opts.slot_name = stmt->subname;
780
781 /* The default for synchronous_commit of subscriptions is off. */
782 if (opts.synchronous_commit == NULL)
783 opts.synchronous_commit = "off";
784
785 /*
786 * The default for wal_receiver_timeout of subscriptions is -1, which
787 * means the value is inherited from the server configuration, command
788 * line, or role/database settings.
789 */
790 if (opts.wal_receiver_timeout == NULL)
791 opts.wal_receiver_timeout = "-1";
792
793 /* Load the library providing us libpq calls. */
794 load_file("libpqwalreceiver", false);
795
796 if (stmt->servername)
797 {
798 ForeignServer *server;
799
800 Assert(!stmt->conninfo);
801 conninfo = NULL;
802
803 server = GetForeignServerByName(stmt->servername, false);
805 if (aclresult != ACLCHECK_OK)
807
808 /* make sure a user mapping exists */
809 GetUserMapping(owner, server->serverid);
810
811 serverid = server->serverid;
812 conninfo = ForeignServerConnectionString(owner, server);
813 }
814 else
815 {
816 Assert(stmt->conninfo);
817
818 serverid = InvalidOid;
819 conninfo = stmt->conninfo;
820 }
821
822 /* Check the connection info string. */
823 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
824
825 publications = stmt->publication;
826
827 /* Everything ok, form a new tuple. */
828 memset(values, 0, sizeof(values));
829 memset(nulls, false, sizeof(nulls));
830
843 CharGetDatum(opts.twophase ?
851 BoolGetDatum(opts.retaindeadtuples);
853 Int32GetDatum(opts.maxretention);
855 BoolGetDatum(opts.retaindeadtuples);
857 if (!OidIsValid(serverid))
859 CStringGetTextDatum(conninfo);
860 else
861 nulls[Anum_pg_subscription_subconninfo - 1] = true;
862 if (opts.slot_name)
865 else
866 nulls[Anum_pg_subscription_subslotname - 1] = true;
868 CStringGetTextDatum(opts.synchronous_commit);
870 CStringGetTextDatum(opts.wal_receiver_timeout);
872 publicationListToArray(publications);
875
878
879 /*
880 * We create the conflict log table here, if required, so that its
881 * relation OID can be stored when inserting the pg_subscription tuple
882 * below.
883 */
884 if (CONFLICTS_LOGGED_TO_TABLE(opts.conflictlogdest))
885 logrelid = create_conflict_log_table(subid, stmt->subname, owner);
886
887 /* Store table OID in the catalog. */
890
892
893 /* Insert tuple into catalog. */
896
898
900
901 if (stmt->servername)
902 {
904
905 Assert(OidIsValid(serverid));
906
909 }
910
911 /*
912 * Establish an internal dependency between the conflict log table and the
913 * subscription.
914 *
915 * We use DEPENDENCY_INTERNAL to signify that the table's lifecycle is
916 * strictly tied to the subscription, similar to how a TOAST table relates
917 * to its main table or a sequence relates to an identity column.
918 *
919 * This ensures the conflict log table is automatically reaped during a
920 * DROP SUBSCRIPTION via performDeletion().
921 */
922 if (OidIsValid(logrelid))
923 {
925
928 }
929
930 /*
931 * A replication origin is currently created for all subscriptions,
932 * including those that only contain sequences or are otherwise empty.
933 *
934 * XXX: While this is technically unnecessary, optimizing it would require
935 * additional logic to skip origin creation during DDL operations and
936 * apply workers initialization, and to handle origin creation dynamically
937 * when tables are added to the subscription. It is not clear whether
938 * preventing creation of origins is worth additional complexity.
939 */
942
943 /*
944 * Connect to remote side to execute requested commands and fetch table
945 * and sequence info.
946 */
947 if (opts.connect)
948 {
949 char *err;
952
953 /* Try to connect to the publisher. */
954 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
955 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
956 stmt->subname, &err);
957 if (!wrconn)
960 errmsg("subscription \"%s\" could not connect to the publisher: %s",
961 stmt->subname, err)));
962
963 PG_TRY();
964 {
965 bool has_tables = false;
966 List *pubrels;
967 char relation_state;
968
969 check_publications(wrconn, publications);
971 opts.copy_data,
972 opts.retaindeadtuples, opts.origin,
973 NULL, 0, stmt->subname);
975 opts.copy_data, opts.origin,
976 NULL, 0, stmt->subname);
977
978 if (opts.retaindeadtuples)
980
981 /*
982 * Set sync state based on if we were asked to do data copy or
983 * not.
984 */
986
987 /*
988 * Build local relation status info. Relations are for both tables
989 * and sequences from the publisher.
990 */
991 pubrels = fetch_relation_list(wrconn, publications);
992
994 {
995 Oid relid;
996 char relkind;
997 RangeVar *rv = pubrelinfo->rv;
998
999 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1000 relkind = get_rel_relkind(relid);
1001
1002 /* Check for supported relkind. */
1003 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1004 rv->schemaname, rv->relname);
1005 has_tables |= (relkind != RELKIND_SEQUENCE);
1007 InvalidXLogRecPtr, true);
1008 }
1009
1010 /*
1011 * If requested, create permanent slot for the subscription. We
1012 * won't use the initial snapshot for anything, so no need to
1013 * export it.
1014 *
1015 * XXX: Similar to origins, it is not clear whether preventing the
1016 * slot creation for empty and sequence-only subscriptions is
1017 * worth additional complexity.
1018 */
1019 if (opts.create_slot)
1020 {
1021 bool twophase_enabled = false;
1022
1023 Assert(opts.slot_name);
1024
1025 /*
1026 * Even if two_phase is set, don't create the slot with
1027 * two-phase enabled. Will enable it once all the tables are
1028 * synced and ready. This avoids race-conditions like prepared
1029 * transactions being skipped due to changes not being applied
1030 * due to checks in should_apply_changes_for_rel() when
1031 * tablesync for the corresponding tables are in progress. See
1032 * comments atop worker.c.
1033 *
1034 * Note that if tables were specified but copy_data is false
1035 * then it is safe to enable two_phase up-front because those
1036 * tables are already initially in READY state. When the
1037 * subscription has no tables, we leave the twophase state as
1038 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
1039 * PUBLICATION to work.
1040 */
1041 if (opts.twophase && !opts.copy_data && has_tables)
1042 twophase_enabled = true;
1043
1045 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
1046
1047 if (twophase_enabled)
1049
1051 (errmsg("created replication slot \"%s\" on publisher",
1052 opts.slot_name)));
1053 }
1054 }
1055 PG_FINALLY();
1056 {
1058 }
1059 PG_END_TRY();
1060 }
1061 else
1063 (errmsg("subscription was created, but is not connected"),
1064 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
1065
1067
1069
1070 /*
1071 * Notify the launcher to start the apply worker if the subscription is
1072 * enabled, or to create the conflict detection slot if retain_dead_tuples
1073 * is enabled.
1074 *
1075 * Creating the conflict detection slot is essential even when the
1076 * subscription is not enabled. This ensures that dead tuples are
1077 * retained, which is necessary for accurately identifying the type of
1078 * conflict during replication.
1079 */
1080 if (opts.enabled || opts.retaindeadtuples)
1082
1084
1085 return myself;
1086}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:475
int errdetail(const char *fmt,...) pg_attribute_printf(1
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
#define InvokeObjectPostCreateHook(classId, objectId, subId)
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
@ OBJECT_FOREIGN_SERVER
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void pgstat_create_subscription(Oid subid)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_CONNECT
bool superuser_arg(Oid roleid)
Definition superuser.c:57
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1681
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

Referenced by ProcessUtilitySlow().

◆ defGetStreamingMode()

char defGetStreamingMode ( DefElem def)

Definition at line 3688 of file subscriptioncmds.c.

3689{
3690 /*
3691 * If no parameter value given, assume "true" is meant.
3692 */
3693 if (!def->arg)
3694 return LOGICALREP_STREAM_ON;
3695
3696 /*
3697 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3698 */
3699 switch (nodeTag(def->arg))
3700 {
3701 case T_Integer:
3702 switch (intVal(def->arg))
3703 {
3704 case 0:
3705 return LOGICALREP_STREAM_OFF;
3706 case 1:
3707 return LOGICALREP_STREAM_ON;
3708 default:
3709 /* otherwise, error out below */
3710 break;
3711 }
3712 break;
3713 default:
3714 {
3715 char *sval = defGetString(def);
3716
3717 /*
3718 * The set of strings accepted here should match up with the
3719 * grammar's opt_boolean_or_string production.
3720 */
3721 if (pg_strcasecmp(sval, "false") == 0 ||
3722 pg_strcasecmp(sval, "off") == 0)
3723 return LOGICALREP_STREAM_OFF;
3724 if (pg_strcasecmp(sval, "true") == 0 ||
3725 pg_strcasecmp(sval, "on") == 0)
3726 return LOGICALREP_STREAM_ON;
3727 if (pg_strcasecmp(sval, "parallel") == 0)
3729 }
3730 break;
3731 }
3732
3733 ereport(ERROR,
3735 errmsg("%s requires a Boolean value or \"parallel\"",
3736 def->defname)));
3737 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3738}
char * defGetString(DefElem *def)
Definition define.c:34
#define nodeTag(nodeptr)
Definition nodes.h:137
char * defname
Definition parsenodes.h:863
Node * arg
Definition parsenodes.h:864
#define intVal(v)
Definition value.h:79

References DefElem::arg, defGetString(), DefElem::defname, ereport, errcode(), errmsg, ERROR, fb(), intVal, nodeTag, and pg_strcasecmp().

Referenced by parse_output_parameters(), and parse_subscription_options().

◆ drop_sub_conflict_log_table()

static void drop_sub_conflict_log_table ( Oid  subid,
char subname,
Oid  subconflictlogrelid 
)
static

Definition at line 2469 of file subscriptioncmds.c.

2470{
2471 /* Drop any dependent conflict log table */
2473 {
2474 ObjectAddress object;
2475 char *conflictrelname;
2476
2478 if (conflictrelname == NULL)
2479 elog(ERROR, "cache lookup failed for relation %u",
2481
2482 /*
2483 * By using PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the
2484 * conflict log table is deleted while the subscription remains.
2485 */
2490
2492 errmsg("dropped conflict log table \"%s\" for subscription \"%s\"",
2494 subname));
2495 }
2496}
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition dependency.c:279
#define PERFORM_DELETION_SKIP_ORIGINAL
Definition dependency.h:95
#define PERFORM_DELETION_INTERNAL
Definition dependency.h:92
char * get_qualified_objname(Oid nspid, char *objname)
Definition lsyscache.c:3712
@ DROP_CASCADE
Oid subconflictlogrelid

References DROP_CASCADE, elog, ereport, errmsg, ERROR, fb(), get_qualified_objname(), get_rel_name(), NOTICE, ObjectAddressSet, OidIsValid, PERFORM_DELETION_INTERNAL, PERFORM_DELETION_SKIP_ORIGINAL, performDeletion(), subconflictlogrelid, and subname.

Referenced by alter_sub_conflict_log_dest(), and DropSubscription().

◆ DropSubscription()

void DropSubscription ( DropSubscriptionStmt stmt,
bool  isTopLevel 
)

Definition at line 2502 of file subscriptioncmds.c.

2503{
2504 Relation rel;
2506 HeapTuple tup;
2507 Oid subid;
2508 Oid subowner;
2509 Oid subserver;
2511 char *subconninfo = NULL;
2512 Datum datum;
2513 bool isnull;
2514 char *subname;
2515 char *conninfo = NULL;
2516 char *slotname;
2518 ListCell *lc;
2519 char originname[NAMEDATALEN];
2520 char *err = NULL;
2523 List *rstates;
2524 bool must_use_password;
2525
2526 /*
2527 * The launcher may concurrently start a new worker for this subscription.
2528 * During initialization, the worker checks for subscription validity and
2529 * exits if the subscription has already been dropped. See
2530 * InitializeLogRepWorker.
2531 */
2533
2535 CStringGetDatum(stmt->subname));
2536
2537 if (!HeapTupleIsValid(tup))
2538 {
2539 table_close(rel, NoLock);
2540
2541 if (!stmt->missing_ok)
2542 ereport(ERROR,
2544 errmsg("subscription \"%s\" does not exist",
2545 stmt->subname)));
2546 else
2548 (errmsg("subscription \"%s\" does not exist, skipping",
2549 stmt->subname)));
2550
2551 return;
2552 }
2553
2556 if (!isnull)
2557 subconninfo = TextDatumGetCString(datum);
2558
2560 subid = form->oid;
2561 subowner = form->subowner;
2562 subserver = form->subserver;
2563 subconflictlogrelid = form->subconflictlogrelid;
2564 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2565
2566 /* must be owner */
2569 stmt->subname);
2570
2571 /* DROP hook for the subscription being removed */
2573
2574 /*
2575 * Lock the subscription so nobody else can do anything with it (including
2576 * the replication workers).
2577 */
2579
2580 /* Get subname */
2583 subname = pstrdup(NameStr(*DatumGetName(datum)));
2584
2585 /* Get slotname */
2588 if (!isnull)
2589 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2590 else
2591 slotname = NULL;
2592
2593 /*
2594 * Since dropping a replication slot is not transactional, the replication
2595 * slot stays dropped even if the transaction rolls back. So we cannot
2596 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2597 * replication slot. Also, in this case, we report a message for dropping
2598 * the subscription to the cumulative stats system.
2599 *
2600 * XXX The command name should really be something like "DROP SUBSCRIPTION
2601 * of a subscription that is associated with a replication slot", but we
2602 * don't have the proper facilities for that.
2603 */
2604 if (slotname)
2605 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2606
2609
2610 /* Remove the tuple from catalog. */
2611 CatalogTupleDelete(rel, &tup->t_self);
2612
2614
2615 /*
2616 * Stop all the subscription workers immediately.
2617 *
2618 * This is necessary if we are dropping the replication slot, so that the
2619 * slot becomes accessible.
2620 *
2621 * It is also necessary if the subscription is disabled and was disabled
2622 * in the same transaction. Then the workers haven't seen the disabling
2623 * yet and will still be running, leading to hangs later when we want to
2624 * drop the replication origin. If the subscription was disabled before
2625 * this transaction, then there shouldn't be any workers left, so this
2626 * won't make a difference.
2627 *
2628 * New workers won't be started because we hold an exclusive lock on the
2629 * subscription till the end of the transaction.
2630 */
2631 subworkers = logicalrep_workers_find(subid, false, true);
2632 foreach(lc, subworkers)
2633 {
2635
2637 }
2639
2640 /*
2641 * Remove the no-longer-useful entry in the launcher's table of apply
2642 * worker start times.
2643 *
2644 * If this transaction rolls back, the launcher might restart a failed
2645 * apply worker before wal_retrieve_retry_interval milliseconds have
2646 * elapsed, but that's pretty harmless.
2647 */
2649
2650 /*
2651 * Cleanup of tablesync replication origins.
2652 *
2653 * Any READY-state relations would already have dealt with clean-ups.
2654 *
2655 * Note that the state can't change because we have already stopped both
2656 * the apply and tablesync workers and they can't restart because of
2657 * exclusive lock on the subscription.
2658 */
2659 rstates = GetSubscriptionRelations(subid, true, false, true);
2660 foreach(lc, rstates)
2661 {
2663 Oid relid = rstate->relid;
2664
2665 /* Only cleanup resources of tablesync workers */
2666 if (!OidIsValid(relid))
2667 continue;
2668
2669 /*
2670 * Drop the tablesync's origin tracking if exists.
2671 *
2672 * It is possible that the origin is not yet created for tablesync
2673 * worker so passing missing_ok = true. This can happen for the states
2674 * before SUBREL_STATE_DATASYNC.
2675 */
2677 sizeof(originname));
2678 replorigin_drop_by_name(originname, true, false);
2679 }
2680
2681 /* Drop subscription's conflict log table */
2683
2684 /* Clean up dependencies */
2687
2688 /* Remove any associated relation synchronization states. */
2690
2691 /* Remove the origin tracking if exists. */
2693 replorigin_drop_by_name(originname, true, false);
2694
2695 /*
2696 * Tell the cumulative stats system that the subscription is getting
2697 * dropped.
2698 */
2700
2701 /*
2702 * If there is no slot associated with the subscription, we can finish
2703 * here.
2704 */
2705 if (!slotname && rstates == NIL)
2706 {
2707 table_close(rel, NoLock);
2708 return;
2709 }
2710
2711 /*
2712 * Try to acquire the connection necessary for dropping slots.
2713 *
2714 * Note: If the slotname is NONE/NULL then we allow the command to finish
2715 * and users need to manually cleanup the apply and tablesync worker slots
2716 * later.
2717 *
2718 * This has to be at the end because otherwise if there is an error while
2719 * doing the database operations we won't be able to rollback dropped
2720 * slot.
2721 */
2722 load_file("libpqwalreceiver", false);
2723
2724 if (OidIsValid(subserver))
2726 else
2727 conninfo = subconninfo;
2728
2729 if (conninfo)
2730 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2731 subname, &err);
2732
2733 if (wrconn == NULL)
2734 {
2735 if (!slotname)
2736 {
2737 /* be tidy */
2739 table_close(rel, NoLock);
2740 return;
2741 }
2742 else
2743 {
2744 ReportSlotConnectionError(rstates, subid, slotname, err);
2745 }
2746 }
2747
2748 PG_TRY();
2749 {
2750 foreach(lc, rstates)
2751 {
2753 Oid relid = rstate->relid;
2754
2755 /* Only cleanup resources of tablesync workers */
2756 if (!OidIsValid(relid))
2757 continue;
2758
2759 /*
2760 * Drop the tablesync slots associated with removed tables.
2761 *
2762 * For SYNCDONE/READY states, the tablesync slot is known to have
2763 * already been dropped by the tablesync worker.
2764 *
2765 * For other states, there is no certainty, maybe the slot does
2766 * not exist yet. Also, if we fail after removing some of the
2767 * slots, next time, it will again try to drop already dropped
2768 * slots and fail. For these reasons, we allow missing_ok = true
2769 * for the drop.
2770 */
2771 if (rstate->state != SUBREL_STATE_SYNCDONE)
2772 {
2773 char syncslotname[NAMEDATALEN] = {0};
2774
2776 sizeof(syncslotname));
2778 }
2779 }
2780
2782
2783 /*
2784 * If there is a slot associated with the subscription, then drop the
2785 * replication slot at the publisher.
2786 */
2787 if (slotname)
2788 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2789 }
2790 PG_FINALLY();
2791 {
2793 }
2794 PG_END_TRY();
2795
2796 table_close(rel, NoLock);
2797}
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
void list_free(List *list)
Definition list.c:1546
char * pstrdup(const char *in)
Definition mcxt.c:1910
#define InvokeObjectDropHook(classId, objectId, subId)
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
Definition pg_depend.c:314
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void pgstat_drop_subscription(Oid subid)
static Name DatumGetName(Datum X)
Definition postgres.h:393
LogicalRepWorkerType type
static char * construct_subserver_conninfo(Oid subserver, Oid subowner, char **err)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:231
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:626
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596

References AccessExclusiveLock, aclcheck_error(), ACLCHECK_NOT_OWNER, ApplyLauncherForgetWorkerStartTime(), CatalogTupleDelete(), construct_subserver_conninfo(), CStringGetDatum(), DatumGetName(), deleteDependencyRecordsFor(), deleteSharedDependencyRecordsFor(), drop_sub_conflict_log_table(), ereport, err(), errcode(), errmsg, ERROR, EventTriggerSQLDropAddObject(), fb(), Form_pg_subscription, GETSTRUCT(), GetSubscriptionRelations(), GetUserId(), HeapTupleIsValid, InvalidOid, InvokeObjectDropHook, lfirst, list_free(), load_file(), LockSharedObject(), logicalrep_worker_stop(), logicalrep_workers_find(), MyDatabaseId, NAMEDATALEN, NameStr, NIL, NoLock, NOTICE, object_ownercheck(), OBJECT_SUBSCRIPTION, ObjectAddressSet, ObjectIdGetDatum(), OidIsValid, PG_END_TRY, PG_FINALLY, PG_TRY, pgstat_drop_subscription(), PreventInTransactionBlock(), pstrdup(), ReleaseSysCache(), SubscriptionRelState::relid, LogicalRepWorker::relid, RemoveSubscriptionRel(), ReplicationOriginNameForLogicalRep(), ReplicationSlotDropAtPubNode(), ReplicationSlotNameForTablesync(), replorigin_drop_by_name(), ReportSlotConnectionError(), RowExclusiveLock, SearchSysCache2(), SubscriptionRelState::state, stmt, subconflictlogrelid, LogicalRepWorker::subid, subname, superuser_arg(), SysCacheGetAttr(), SysCacheGetAttrNotNull(), table_close(), table_open(), TextDatumGetCString, LogicalRepWorker::type, walrcv_connect, walrcv_disconnect, and wrconn.

Referenced by ProcessUtilitySlow().

◆ fetch_relation_list()

static List * fetch_relation_list ( WalReceiverConn wrconn,
List publications 
)
static

Definition at line 3421 of file subscriptioncmds.c.

3422{
3423 WalRcvExecResult *res;
3424 StringInfoData cmd;
3425 TupleTableSlot *slot;
3429 bool check_columnlist = (server_version >= 150000);
3430 int column_count = check_columnlist ? 4 : 3;
3431 StringInfoData pub_names;
3432
3433 initStringInfo(&cmd);
3434 initStringInfo(&pub_names);
3435
3436 /* Build the pub_names comma-separated string. */
3437 GetPublicationsStr(publications, &pub_names, true);
3438
3439 /* Get the list of relations from the publisher */
3440 if (server_version >= 160000)
3441 {
3443
3444 /*
3445 * From version 16, we allowed passing multiple publications to the
3446 * function pg_get_publication_tables. This helped to filter out the
3447 * partition table whose ancestor is also published in this
3448 * publication array.
3449 *
3450 * Join pg_get_publication_tables with pg_publication to exclude
3451 * non-existing publications.
3452 *
3453 * Note that attrs are always stored in sorted order so we don't need
3454 * to worry if different publications have specified them in a
3455 * different order. See pub_collist_validate.
3456 */
3457 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
3458 " FROM pg_class c\n"
3459 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
3460 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
3461 " FROM pg_publication\n"
3462 " WHERE pubname IN ( %s )) AS gpt\n"
3463 " ON gpt.relid = c.oid\n",
3464 pub_names.data);
3465
3466 /* From version 19, inclusion of sequences in the target is supported */
3467 if (server_version >= 190000)
3468 appendStringInfo(&cmd,
3469 "UNION ALL\n"
3470 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
3471 " FROM pg_catalog.pg_publication_sequences s\n"
3472 " WHERE s.pubname IN ( %s )",
3473 pub_names.data);
3474 }
3475 else
3476 {
3478 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
3479
3480 /* Get column lists for each relation if the publisher supports it */
3481 if (check_columnlist)
3482 appendStringInfoString(&cmd, ", t.attnames\n");
3483
3484 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
3485 " WHERE t.pubname IN ( %s )",
3486 pub_names.data);
3487 }
3488
3489 pfree(pub_names.data);
3490
3492 pfree(cmd.data);
3493
3494 if (res->status != WALRCV_OK_TUPLES)
3495 ereport(ERROR,
3497 errmsg("could not receive list of replicated tables from the publisher: %s",
3498 res->err)));
3499
3500 /* Process tables. */
3502 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3503 {
3504 char *nspname;
3505 char *relname;
3506 bool isnull;
3507 char relkind;
3509
3510 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3511 Assert(!isnull);
3512 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3513 Assert(!isnull);
3514 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
3515 Assert(!isnull);
3516
3517 relinfo->rv = makeRangeVar(nspname, relname, -1);
3518 relinfo->relkind = relkind;
3519
3520 if (relkind != RELKIND_SEQUENCE &&
3523 ereport(ERROR,
3525 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
3526 nspname, relname));
3527 else
3529
3530 ExecClearTuple(slot);
3531 }
3533
3535
3536 return relationlist;
3537}
#define CppAsString2(x)
Definition c.h:565
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition makefuncs.c:473
static int server_version
Definition pg_dumpall.c:109
static char DatumGetChar(Datum X)
Definition postgres.h:122
static bool list_member_rangevar(const List *list, RangeVar *rv)

References appendStringInfo(), appendStringInfoString(), Assert, CppAsString2, StringInfoData::data, DatumGetChar(), ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, ExecClearTuple(), ExecDropSingleTupleTableSlot(), fb(), GetPublicationsStr(), initStringInfo(), InvalidOid, lappend(), list_member_rangevar(), makeRangeVar(), MakeSingleTupleTableSlot(), NIL, palloc_object, pfree(), relname, server_version, slot_getattr(), WalRcvExecResult::status, TextDatumGetCString, TTSOpsMinimalTuple, WalRcvExecResult::tupledesc, WalRcvExecResult::tuplestore, tuplestore_gettupleslot(), walrcv_clear_result(), walrcv_exec, WALRCV_OK_TUPLES, walrcv_server_version, and wrconn.

Referenced by AlterSubscription_refresh().

◆ list_member_rangevar()

static bool list_member_rangevar ( const List list,
RangeVar rv 
)
static

Definition at line 3400 of file subscriptioncmds.c.

3401{
3403 {
3404 if (equal(relinfo->rv, rv))
3405 return true;
3406 }
3407
3408 return false;
3409}
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223

References equal(), fb(), and foreach_ptr.

Referenced by fetch_relation_list().

◆ merge_publications()

static List * merge_publications ( List oldpublist,
List newpublist,
bool  addpub,
const char subname 
)
static

Definition at line 3629 of file subscriptioncmds.c.

3630{
3631 ListCell *lc;
3632
3634
3636
3637 foreach(lc, newpublist)
3638 {
3639 char *name = strVal(lfirst(lc));
3640 ListCell *lc2;
3641 bool found = false;
3642
3643 foreach(lc2, oldpublist)
3644 {
3645 char *pubname = strVal(lfirst(lc2));
3646
3647 if (strcmp(name, pubname) == 0)
3648 {
3649 found = true;
3650 if (addpub)
3651 ereport(ERROR,
3653 errmsg("publication \"%s\" is already in subscription \"%s\"",
3654 name, subname)));
3655 else
3657
3658 break;
3659 }
3660 }
3661
3662 if (addpub && !found)
3664 else if (!addpub && !found)
3665 ereport(ERROR,
3667 errmsg("publication \"%s\" is not in subscription \"%s\"",
3668 name, subname)));
3669 }
3670
3671 /*
3672 * XXX Probably no strong reason for this, but for now it's to make ALTER
3673 * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3674 */
3675 if (!oldpublist)
3676 ereport(ERROR,
3678 errmsg("cannot drop all the publications from a subscription")));
3679
3680 return oldpublist;
3681}
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:423
static void check_duplicates_in_publist(List *publist, Datum *datums)

References check_duplicates_in_publist(), ereport, errcode(), ERRCODE_DUPLICATE_OBJECT, errmsg, ERROR, fb(), foreach_delete_current, lappend(), lfirst, list_copy(), makeString(), name, strVal, and subname.

Referenced by AlterSubscription().

◆ parse_subscription_options()

static void parse_subscription_options ( ParseState pstate,
List stmt_options,
uint32  supported_opts,
SubOpts opts 
)
static

Definition at line 161 of file subscriptioncmds.c.

163{
164 ListCell *lc;
165
166 /* Start out with cleared opts. */
167 memset(opts, 0, sizeof(SubOpts));
168
169 /* caller must expect some option */
171
172 /* If connect option is supported, these others also need to be. */
176
177 /* Set default values for the supported options. */
179 opts->connect = true;
181 opts->enabled = true;
183 opts->create_slot = true;
185 opts->copy_data = true;
187 opts->refresh = true;
189 opts->binary = false;
191 opts->streaming = LOGICALREP_STREAM_PARALLEL;
193 opts->twophase = false;
195 opts->disableonerr = false;
197 opts->passwordrequired = true;
199 opts->runasowner = false;
201 opts->failover = false;
203 opts->retaindeadtuples = false;
205 opts->maxretention = 0;
209 opts->conflictlogdest = CONFLICT_LOG_DEST_LOG;
210
211 /* Parse options */
212 foreach(lc, stmt_options)
213 {
214 DefElem *defel = (DefElem *) lfirst(lc);
215
217 strcmp(defel->defname, "connect") == 0)
218 {
219 if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
221
222 opts->specified_opts |= SUBOPT_CONNECT;
223 opts->connect = defGetBoolean(defel);
224 }
226 strcmp(defel->defname, "enabled") == 0)
227 {
228 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
230
231 opts->specified_opts |= SUBOPT_ENABLED;
232 opts->enabled = defGetBoolean(defel);
233 }
235 strcmp(defel->defname, "create_slot") == 0)
236 {
237 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
239
240 opts->specified_opts |= SUBOPT_CREATE_SLOT;
241 opts->create_slot = defGetBoolean(defel);
242 }
244 strcmp(defel->defname, "slot_name") == 0)
245 {
246 if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
248
249 opts->specified_opts |= SUBOPT_SLOT_NAME;
250 opts->slot_name = defGetString(defel);
251
252 /* Setting slot_name = NONE is treated as no slot name. */
253 if (strcmp(opts->slot_name, "none") == 0)
254 opts->slot_name = NULL;
255 else
256 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
257 }
259 strcmp(defel->defname, "copy_data") == 0)
260 {
261 if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
263
264 opts->specified_opts |= SUBOPT_COPY_DATA;
265 opts->copy_data = defGetBoolean(defel);
266 }
268 strcmp(defel->defname, "synchronous_commit") == 0)
269 {
270 if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
272
273 opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
274 opts->synchronous_commit = defGetString(defel);
275
276 /* Test if the given value is valid for synchronous_commit GUC. */
277 (void) set_config_option("synchronous_commit", opts->synchronous_commit,
279 false, 0, false);
280 }
282 strcmp(defel->defname, "refresh") == 0)
283 {
284 if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
286
287 opts->specified_opts |= SUBOPT_REFRESH;
288 opts->refresh = defGetBoolean(defel);
289 }
291 strcmp(defel->defname, "binary") == 0)
292 {
293 if (IsSet(opts->specified_opts, SUBOPT_BINARY))
295
296 opts->specified_opts |= SUBOPT_BINARY;
297 opts->binary = defGetBoolean(defel);
298 }
300 strcmp(defel->defname, "streaming") == 0)
301 {
302 if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
304
305 opts->specified_opts |= SUBOPT_STREAMING;
306 opts->streaming = defGetStreamingMode(defel);
307 }
309 strcmp(defel->defname, "two_phase") == 0)
310 {
311 if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
313
314 opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
315 opts->twophase = defGetBoolean(defel);
316 }
318 strcmp(defel->defname, "disable_on_error") == 0)
319 {
320 if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
322
323 opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
324 opts->disableonerr = defGetBoolean(defel);
325 }
327 strcmp(defel->defname, "password_required") == 0)
328 {
329 if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
331
332 opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
333 opts->passwordrequired = defGetBoolean(defel);
334 }
336 strcmp(defel->defname, "run_as_owner") == 0)
337 {
338 if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
340
341 opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
342 opts->runasowner = defGetBoolean(defel);
343 }
345 strcmp(defel->defname, "failover") == 0)
346 {
347 if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
349
350 opts->specified_opts |= SUBOPT_FAILOVER;
351 opts->failover = defGetBoolean(defel);
352 }
354 strcmp(defel->defname, "retain_dead_tuples") == 0)
355 {
356 if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
358
359 opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
360 opts->retaindeadtuples = defGetBoolean(defel);
361 }
363 strcmp(defel->defname, "max_retention_duration") == 0)
364 {
365 if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
367
368 opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
369 opts->maxretention = defGetInt32(defel);
370
371 if (opts->maxretention < 0)
374 errmsg("max_retention_duration cannot be negative"));
375 }
377 strcmp(defel->defname, "origin") == 0)
378 {
379 if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
381
382 opts->specified_opts |= SUBOPT_ORIGIN;
383 pfree(opts->origin);
384
385 /*
386 * Even though the "origin" parameter allows only "none" and "any"
387 * values, it is implemented as a string type so that the
388 * parameter can be extended in future versions to support
389 * filtering using origin names specified by the user.
390 */
391 opts->origin = defGetString(defel);
392
393 if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
397 errmsg("unrecognized origin value: \"%s\"", opts->origin));
398 }
399 else if (IsSet(supported_opts, SUBOPT_LSN) &&
400 strcmp(defel->defname, "lsn") == 0)
401 {
402 char *lsn_str = defGetString(defel);
403 XLogRecPtr lsn;
404
405 if (IsSet(opts->specified_opts, SUBOPT_LSN))
407
408 /* Setting lsn = NONE is treated as resetting LSN */
409 if (strcmp(lsn_str, "none") == 0)
410 lsn = InvalidXLogRecPtr;
411 else
412 {
413 /* Parse the argument as LSN */
416
417 if (!XLogRecPtrIsValid(lsn))
420 errmsg("invalid WAL location (LSN): %s", lsn_str)));
421 }
422
423 opts->specified_opts |= SUBOPT_LSN;
424 opts->lsn = lsn;
425 }
427 strcmp(defel->defname, "wal_receiver_timeout") == 0)
428 {
429 bool parsed;
430 int val;
431
432 if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
434
435 opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
436 opts->wal_receiver_timeout = defGetString(defel);
437
438 /*
439 * Test if the given value is valid for wal_receiver_timeout GUC.
440 * Skip this test if the value is -1, since -1 is allowed for the
441 * wal_receiver_timeout subscription option, but not for the GUC
442 * itself.
443 */
444 parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
445 if (!parsed || val != -1)
446 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
448 false, 0, false);
449 }
451 strcmp(defel->defname, "conflict_log_destination") == 0)
452 {
453 char *val;
454
455 if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST))
457
459 opts->conflictlogdest = GetConflictLogDest(val);
460 opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST;
461 }
462 else
465 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
466 }
467
468 /*
469 * We've been explicitly asked to not connect, that requires some
470 * additional processing.
471 */
472 if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
473 {
474 /* Check for incompatible options from the user. */
475 if (opts->enabled &&
476 IsSet(opts->specified_opts, SUBOPT_ENABLED))
479 /*- translator: both %s are strings of the form "option = value" */
480 errmsg("%s and %s are mutually exclusive options",
481 "connect = false", "enabled = true")));
482
483 if (opts->create_slot &&
484 IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
487 errmsg("%s and %s are mutually exclusive options",
488 "connect = false", "create_slot = true")));
489
490 if (opts->copy_data &&
491 IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
494 errmsg("%s and %s are mutually exclusive options",
495 "connect = false", "copy_data = true")));
496
497 /* Change the defaults of other options. */
498 opts->enabled = false;
499 opts->create_slot = false;
500 opts->copy_data = false;
501 }
502
503 /*
504 * Do additional checking for disallowed combination when slot_name = NONE
505 * was used.
506 */
507 if (!opts->slot_name &&
508 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
509 {
510 if (opts->enabled)
511 {
512 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
515 /*- translator: both %s are strings of the form "option = value" */
516 errmsg("%s and %s are mutually exclusive options",
517 "slot_name = NONE", "enabled = true")));
518 else
521 /*- translator: both %s are strings of the form "option = value" */
522 errmsg("subscription with %s must also set %s",
523 "slot_name = NONE", "enabled = false")));
524 }
525
526 if (opts->create_slot)
527 {
528 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
531 /*- translator: both %s are strings of the form "option = value" */
532 errmsg("%s and %s are mutually exclusive options",
533 "slot_name = NONE", "create_slot = true")));
534 else
537 /*- translator: both %s are strings of the form "option = value" */
538 errmsg("subscription with %s must also set %s",
539 "slot_name = NONE", "create_slot = false")));
540 }
541 }
542}
@ CONFLICT_LOG_DEST_LOG
Definition conflict.h:91
int32 defGetInt32(DefElem *def)
Definition define.c:148
bool defGetBoolean(DefElem *def)
Definition define.c:93
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition define.c:370
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition guc.c:3248
@ GUC_ACTION_SET
Definition guc.h:203
@ PGC_S_TEST
Definition guc.h:125
@ PGC_BACKEND
Definition guc.h:77
long val
Definition informix.c:689
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition pg_lsn.c:64
static XLogRecPtr DatumGetLSN(Datum X)
Definition pg_lsn.h:25
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:265
char defGetStreamingMode(DefElem *def)

References Assert, CONFLICT_LOG_DEST_LOG, CStringGetDatum(), DatumGetLSN(), defGetBoolean(), defGetInt32(), defGetStreamingMode(), defGetString(), DirectFunctionCall1, ereport, errcode(), errmsg, ERROR, errorConflictingDefElem(), fb(), GetConflictLogDest(), GUC_ACTION_SET, InvalidXLogRecPtr, IsSet, lfirst, opts, parse_int(), pfree(), pg_lsn_in(), pg_strcasecmp(), PGC_BACKEND, PGC_S_TEST, pstrdup(), ReplicationSlotValidateName(), set_config_option(), SUBOPT_BINARY, SUBOPT_CONFLICT_LOG_DEST, SUBOPT_CONNECT, SUBOPT_COPY_DATA, SUBOPT_CREATE_SLOT, SUBOPT_DISABLE_ON_ERR, SUBOPT_ENABLED, SUBOPT_FAILOVER, SUBOPT_LSN, SUBOPT_MAX_RETENTION_DURATION, SUBOPT_ORIGIN, SUBOPT_PASSWORD_REQUIRED, SUBOPT_REFRESH, SUBOPT_RETAIN_DEAD_TUPLES, SUBOPT_RUN_AS_OWNER, SUBOPT_SLOT_NAME, SUBOPT_STREAMING, SUBOPT_SYNCHRONOUS_COMMIT, SUBOPT_TWOPHASE_COMMIT, SUBOPT_WAL_RECEIVER_TIMEOUT, val, and XLogRecPtrIsValid.

Referenced by AlterSubscription().

◆ publicationListToArray()

static Datum publicationListToArray ( List publist)
static

Definition at line 639 of file subscriptioncmds.c.

640{
641 ArrayType *arr;
642 Datum *datums;
643 MemoryContext memcxt;
645
646 /* Create memory context for temporary allocations. */
648 "publicationListToArray to array",
651
653
655
657
659
660 MemoryContextDelete(memcxt);
661
662 return PointerGetDatum(arr);
663}
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
#define palloc_array(type, count)
Definition fe_memutils.h:91
MemoryContext CurrentMemoryContext
Definition mcxt.c:161
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:475
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:138
#define PointerGetDatum(X)
Definition postgres.h:354

Referenced by AlterSubscription().

◆ ReplicationSlotDropAtPubNode()

void ReplicationSlotDropAtPubNode ( WalReceiverConn wrconn,
char slotname,
bool  missing_ok 
)

Definition at line 2807 of file subscriptioncmds.c.

2808{
2809 StringInfoData cmd;
2810
2811 Assert(wrconn);
2812
2813 load_file("libpqwalreceiver", false);
2814
2815 initStringInfo(&cmd);
2816 appendStringInfoString(&cmd, "DROP_REPLICATION_SLOT ");
2817 appendQuotedIdentifier(&cmd, slotname);
2818 appendStringInfoString(&cmd, " WAIT");
2819
2820 PG_TRY();
2821 {
2822 WalRcvExecResult *res;
2823
2824 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2825
2826 if (res->status == WALRCV_OK_COMMAND)
2827 {
2828 /* NOTICE. Success. */
2830 (errmsg("dropped replication slot \"%s\" on publisher",
2831 slotname)));
2832 }
2833 else if (res->status == WALRCV_ERROR &&
2834 missing_ok &&
2836 {
2837 /* LOG. Error, but missing_ok = true. */
2838 ereport(LOG,
2839 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2840 slotname, res->err)));
2841 }
2842 else
2843 {
2844 /* ERROR. */
2845 ereport(ERROR,
2847 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2848 slotname, res->err)));
2849 }
2850
2852 }
2853 PG_FINALLY();
2854 {
2855 pfree(cmd.data);
2856 }
2857 PG_END_TRY();
2858}
#define LOG
Definition elog.h:32
#define appendQuotedIdentifier(b, s)
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR

References appendQuotedIdentifier, appendStringInfoString(), Assert, StringInfoData::data, ereport, WalRcvExecResult::err, errcode(), errmsg, ERROR, fb(), initStringInfo(), load_file(), LOG, NOTICE, pfree(), PG_END_TRY, PG_FINALLY, PG_TRY, WalRcvExecResult::sqlstate, WalRcvExecResult::status, walrcv_clear_result(), WALRCV_ERROR, walrcv_exec, WALRCV_OK_COMMAND, and wrconn.

Referenced by AlterSubscription_refresh(), DropSubscription(), LogicalRepSyncTableStart(), and ProcessSyncingTablesForSync().

◆ ReportSlotConnectionError()

static void ReportSlotConnectionError ( List rstates,
Oid  subid,
char slotname,
char err 
)
static

Definition at line 3545 of file subscriptioncmds.c.

3546{
3547 ListCell *lc;
3548
3549 foreach(lc, rstates)
3550 {
3552 Oid relid = rstate->relid;
3553
3554 /* Only cleanup resources of tablesync workers */
3555 if (!OidIsValid(relid))
3556 continue;
3557
3558 /*
3559 * Caller needs to ensure that relstate doesn't change underneath us.
3560 * See DropSubscription where we get the relstates.
3561 */
3562 if (rstate->state != SUBREL_STATE_SYNCDONE)
3563 {
3564 char syncslotname[NAMEDATALEN] = {0};
3565
3567 sizeof(syncslotname));
3568 elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3569 syncslotname);
3570 }
3571 }
3572
3573 ereport(ERROR,
3575 errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3576 slotname, err),
3577 /* translator: %s is an SQL ALTER command */
3578 errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3579 "ALTER SUBSCRIPTION ... DISABLE",
3580 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3581}

References elog, ereport, err(), errcode(), errhint(), errmsg, ERROR, fb(), lfirst, NAMEDATALEN, OidIsValid, SubscriptionRelState::relid, ReplicationSlotNameForTablesync(), SubscriptionRelState::state, and WARNING.

Referenced by DropSubscription().