28#include "catalog/pg_authid_d.h"
29#include "catalog/pg_database_d.h"
63#define SUBOPT_CONNECT 0x00000001
64#define SUBOPT_ENABLED 0x00000002
65#define SUBOPT_CREATE_SLOT 0x00000004
66#define SUBOPT_SLOT_NAME 0x00000008
67#define SUBOPT_COPY_DATA 0x00000010
68#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
69#define SUBOPT_REFRESH 0x00000040
70#define SUBOPT_BINARY 0x00000080
71#define SUBOPT_STREAMING 0x00000100
72#define SUBOPT_TWOPHASE_COMMIT 0x00000200
73#define SUBOPT_DISABLE_ON_ERR 0x00000400
74#define SUBOPT_PASSWORD_REQUIRED 0x00000800
75#define SUBOPT_RUN_AS_OWNER 0x00001000
76#define SUBOPT_FAILOVER 0x00002000
77#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
78#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
79#define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
80#define SUBOPT_LSN 0x00020000
81#define SUBOPT_ORIGIN 0x00040000
84#define IsSet(val, bits) (((val) & (bits)) == (bits))
170 opts->connect =
true;
172 opts->enabled =
true;
174 opts->create_slot =
true;
176 opts->copy_data =
true;
178 opts->refresh =
true;
180 opts->binary =
false;
184 opts->twophase =
false;
186 opts->disableonerr =
false;
188 opts->passwordrequired =
true;
190 opts->runasowner =
false;
192 opts->failover =
false;
194 opts->retaindeadtuples =
false;
196 opts->maxretention = 0;
352 strcmp(
defel->defname,
"max_retention_duration") == 0)
360 if (
opts->maxretention < 0)
363 errmsg(
"max_retention_duration cannot be negative"));
386 errmsg(
"unrecognized origin value: \"%s\"",
opts->origin));
416 strcmp(
defel->defname,
"wal_receiver_timeout") == 0)
442 errmsg(
"unrecognized subscription parameter: \"%s\"",
defel->defname)));
457 errmsg(
"%s and %s are mutually exclusive options",
458 "connect = false",
"enabled = true")));
460 if (
opts->create_slot &&
464 errmsg(
"%s and %s are mutually exclusive options",
465 "connect = false",
"create_slot = true")));
467 if (
opts->copy_data &&
471 errmsg(
"%s and %s are mutually exclusive options",
472 "connect = false",
"copy_data = true")));
475 opts->enabled =
false;
476 opts->create_slot =
false;
477 opts->copy_data =
false;
484 if (!
opts->slot_name &&
493 errmsg(
"%s and %s are mutually exclusive options",
494 "slot_name = NONE",
"enabled = true")));
499 errmsg(
"subscription with %s must also set %s",
500 "slot_name = NONE",
"enabled = false")));
503 if (
opts->create_slot)
509 errmsg(
"%s and %s are mutually exclusive options",
510 "slot_name = NONE",
"create_slot = true")));
515 errmsg(
"subscription with %s must also set %s",
516 "slot_name = NONE",
"create_slot = false")));
544#define appendQuotedIdentifier(b, s) appendQuotedString(b, s, '"')
545#define appendQuotedLiteral(b, s) appendQuotedString(b, s, '\'')
548 * Check that the specified publications are present on the publisher.
551check_publications(WalReceiverConn *wrconn, List *publications)
553 WalRcvExecResult *res;
555 TupleTableSlot *slot;
556 List *publicationsCopy = NIL;
557 Oid tableRow[1] = {TEXTOID};
559 initStringInfo(&cmd);
560 appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
561 " pg_catalog.pg_publication t WHERE\n"
563 GetPublicationsStr(publications, &cmd, true);
564 appendStringInfoChar(&cmd, ')');
566 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
569 if (res->status != WALRCV_OK_TUPLES)
571 errmsg("could not receive list of publications from the publisher: %s",
574 publicationsCopy = list_copy(publications);
576 /* Process publication(s). */
577 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
578 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
583 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
586 /* Delete the publication present in publisher from the list. */
587 publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
588 ExecClearTuple(slot);
591 ExecDropSingleTupleTableSlot(slot);
593 walrcv_clear_result(res);
595 if (list_length(publicationsCopy))
597 /* Prepare the list of non-existent publication(s) for error message. */
598 StringInfoData pubnames;
600 initStringInfo(&pubnames);
602 GetPublicationsStr(publicationsCopy, &pubnames, false);
604 errcode(ERRCODE_UNDEFINED_OBJECT),
605 errmsg_plural("publication %s does not exist on the publisher",
606 "publications %s do not exist on the publisher",
607 list_length(publicationsCopy),
613 * Auxiliary function to build a text array out of a list of String nodes.
616publicationListToArray(List *publist)
620 MemoryContext memcxt;
621 MemoryContext oldcxt;
623 /* Create memory context for temporary allocations. */
624 memcxt = AllocSetContextCreate(CurrentMemoryContext,
625 "publicationListToArray to array",
626 ALLOCSET_DEFAULT_SIZES);
627 oldcxt = MemoryContextSwitchTo(memcxt);
629 datums = palloc_array(Datum, list_length(publist));
631 check_duplicates_in_publist(publist, datums);
633 MemoryContextSwitchTo(oldcxt);
635 arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
637 MemoryContextDelete(memcxt);
639 return PointerGetDatum(arr);
643 * Create new subscription.
646CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
650 ObjectAddress myself;
652 bool nulls[Natts_pg_subscription];
653 Datum values[Natts_pg_subscription];
654 Oid owner = GetUserId();
658 char originname[NAMEDATALEN];
660 uint32 supported_opts;
665 * Parse and check options.
667 * Connection and publication should not be specified here.
669 supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
670 SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
671 SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
672 SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
673 SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
674 SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
675 SUBOPT_RETAIN_DEAD_TUPLES |
676 SUBOPT_MAX_RETENTION_DURATION |
677 SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
678 parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
681 * Since creating a replication slot is not transactional, rolling back
682 * the transaction leaves the created replication slot. So we cannot run
683 * CREATE SUBSCRIPTION inside a transaction block if creating a
686 if (opts.create_slot)
687 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
690 * We don't want to allow unprivileged users to be able to trigger
691 * attempts to access arbitrary network destinations, so require the user
692 * to have been specifically authorized to create subscriptions.
694 if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
696 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
697 errmsg("permission denied to create subscription"),
698 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
699 "pg_create_subscription")));
719 errmsg(
"password_required=false is superuser-only"),
720 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
726#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
728 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
740 errmsg(
"subscription \"%s\" already exists",
749 opts.retaindeadtuples,
opts.retaindeadtuples,
750 (
opts.maxretention > 0));
757 if (
opts.synchronous_commit ==
NULL)
758 opts.synchronous_commit =
"off";
765 if (
opts.wal_receiver_timeout ==
NULL)
766 opts.wal_receiver_timeout =
"-1";
771 if (
stmt->servername)
794 conninfo =
stmt->conninfo;
800 publications =
stmt->publication;
804 memset(nulls,
false,
sizeof(nulls));
861 if (
stmt->servername)
901 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
919 if (
opts.retaindeadtuples)
960 if (
opts.create_slot)
992 (
errmsg(
"created replication slot \"%s\" on publisher",
1004 (
errmsg(
"subscription was created, but is not connected"),
1005 errhint(
"To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
1021 if (
opts.enabled ||
opts.retaindeadtuples)
1065 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
1280 errmsg_internal(
"sequence \"%s.%s\" removed from subscription \"%s\"",
1317 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
1336 errmsg_internal(
"sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1414 errmsg(
"cannot set option \"%s\" for enabled subscription",
1428 errmsg(
"cannot set option \"%s\" for a subscription that does not have a slot name",
1478 errmsg(
"subscription \"%s\" does not exist",
1585 errmsg(
"password_required=false is superuser-only"),
1586 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1593 memset(nulls,
false,
sizeof(nulls));
1614 errmsg(
"cannot set %s for enabled subscription",
1615 "slot_name = NONE")));
1625 if (
opts.synchronous_commit)
1660 errmsg(
"password_required=false is superuser-only"),
1661 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1699 errmsg(
"\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1715 errmsg(
"cannot alter \"two_phase\" when logical replication worker is still running"),
1716 errhint(
"Try again after some time.")));
1729 errmsg(
"cannot disable \"two_phase\" when prepared transactions exist"),
1730 errhint(
"Resolve these transactions and try again.")));
1799 errmsg(
"cannot alter retain_dead_tuples when logical replication worker is still running"),
1800 errhint(
"Try again after some time.")));
1851 origin =
opts.origin;
1872 errmsg(
"cannot enable subscription that does not have a slot name")));
1912 if (
form->subserver)
1935 errmsg(
"subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1970 if (
form->subserver)
2015 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2016 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
2025 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2026 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2062 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2066 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
2067 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
2076 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2078 errhint(
"Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
2080 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
2081 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
2100 errmsg(
"%s is not allowed for disabled subscriptions",
2101 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2123 errmsg(
"ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2124 errhint(
"Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2138 errmsg(
"%s is not allowed for disabled subscriptions",
2139 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2170 errmsg(
"skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2183 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
2229 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
2291 *
err =
psprintf(
_(
"subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2312 char *subconninfo =
NULL;
2316 char *conninfo =
NULL;
2342 if (!
stmt->missing_ok)
2345 errmsg(
"subscription \"%s\" does not exist",
2349 (
errmsg(
"subscription \"%s\" does not exist, skipping",
2524 conninfo = subconninfo;
2627 (
errmsg(
"dropped replication slot \"%s\" on publisher",
2636 (
errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
2637 slotname, res->
err)));
2644 errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
2645 slotname, res->
err)));
2682 errmsg(
"password_required=false is superuser-only"),
2683 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2714 errmsg(
"new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2758 errmsg(
"subscription \"%s\" does not exist",
name)));
2790 errmsg(
"subscription with OID %u does not exist", subid)));
2861 "SELECT DISTINCT P.pubname AS pubname\n"
2862 "FROM pg_publication P,\n"
2863 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2864 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2865 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2866 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2867 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2868 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2905 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2944 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2946 errdetail_plural(
"The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2947 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2949 errhint(
"Verify that initial data copied from the publisher tables did not come from other origins."));
2953 errmsg(
"subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2955 errdetail_plural(
"The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2956 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2958 errhint(
"Consider using origin = NONE or disabling retain_dead_tuples."));
2994 "SELECT DISTINCT P.pubname AS pubname\n"
2995 "FROM pg_publication P,\n"
2996 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2997 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2998 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2999 "WHERE C.oid = GPS.relid AND P.pubname IN (");
3019 "AND NOT (N.nspname = %s AND C.relname = %s)\n",
3032 errmsg(
"could not receive list of replicated sequences from the publisher: %s",
3064 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
3066 errdetail_plural(
"The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
3067 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
3069 errhint(
"Verify that initial data copied from the publisher sequences did not come from other origins."));
3099 errmsg(
"cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
3106 errmsg(
"could not obtain recovery progress from the publisher: %s",
3111 elog(
ERROR,
"failed to fetch tuple for the recovery progress");
3118 errmsg(
"cannot enable retain_dead_tuples if the publisher is in recovery"));
3162 errmsg(
"\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3163 errhint(
"\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3168 errmsg(
"commit timestamp and origin data required for detecting conflicts won't be retained"),
3169 errhint(
"Consider setting \"%s\" to true.",
3170 "track_commit_timestamp"));
3175 errmsg(
"deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3177 ?
errhint(
"Consider setting %s to false.",
3178 "retain_dead_tuples") : 0);
3184 errmsg(
"max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3249 appendStringInfo(&cmd,
"SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
3250 " FROM pg_class c\n"
3251 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
3252 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
3253 " FROM pg_publication\n"
3254 " WHERE pubname IN ( %s )) AS gpt\n"
3255 " ON gpt.relid = c.oid\n",
3263 " FROM pg_catalog.pg_publication_sequences s\n"
3264 " WHERE s.pubname IN ( %s )",
3277 " WHERE t.pubname IN ( %s )",
3289 errmsg(
"could not receive list of replicated tables from the publisher: %s",
3317 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
3360 elog(
WARNING,
"could not drop tablesync replication slot \"%s\"",
3367 errmsg(
"could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3370 errhint(
"Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3371 "ALTER SUBSCRIPTION ... DISABLE",
3372 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3401 errmsg(
"publication name \"%s\" used more than once",
3445 errmsg(
"publication \"%s\" is already in subscription \"%s\"",
3456 else if (!
addpub && !found)
3459 errmsg(
"publication \"%s\" is not in subscription \"%s\"",
3470 errmsg(
"cannot drop all the publications from a subscription")));
3527 errmsg(
"%s requires a Boolean value or \"parallel\"",
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define TextDatumGetCString(d)
#define Assert(condition)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
bool track_commit_timestamp
int32 defGetInt32(DefElem *def)
char * defGetString(DefElem *def)
bool defGetBoolean(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void load_file(const char *filename, bool restricted)
int errcode(int sqlerrcode)
int errhint(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define ereport(elevel,...)
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
bool equal(const void *a, const void *b)
void err(int eval, const char *fmt,...)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
#define palloc_object(type)
#define DirectFunctionCall1(func, arg1)
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
char * ForeignServerConnectionString(Oid userid, ForeignServer *server)
UserMapping * GetUserMapping(Oid userid, Oid serverid)
ForeignServer * GetForeignServer(Oid serverid)
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void ApplyLauncherWakeupAtCommit(void)
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
List * list_append_unique(List *list, void *datum)
List * list_copy(const List *oldlist)
void list_free(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
char * get_rel_name(Oid relid)
char * get_database_name(Oid dbid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_namespace_name(Oid nspid)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
char * GetUserNameFromId(Oid roleid, bool noerr)
Datum namein(PG_FUNCTION_ARGS)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
ReplOriginId replorigin_create(const char *roname)
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SERVER
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
static AmcheckOptions opts
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, Oid refclassId, Oid refobjectId)
static int server_version
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
#define foreach_ptr(type, var, lst)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
Subscription * GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed, bool conninfo_aclcheck)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
static char buf[DEFAULT_XLOG_SEG_SIZE]
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
static bool DatumGetBool(Datum X)
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static Datum CStringGetDatum(const char *X)
static Datum Int32GetDatum(int32 X)
static Datum CharGetDatum(char X)
char * psprintf(const char *fmt,...)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetDescr(relation)
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
#define ERRCODE_DUPLICATE_OBJECT
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepWorkerType type
char * wal_receiver_timeout
char * synchronous_commit
Tuplestorestate * tuplestore
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
static Datum publicationListToArray(List *publist)
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
static char * construct_subserver_conninfo(Oid subserver, Oid subowner, char **err)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void parse_subscription_options(ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define appendQuotedIdentifier(b, s)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static bool list_member_rangevar(const List *list, RangeVar *rv)
static void appendQuotedString(StringInfo buf, const char *str, char quote)
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
bool LookupGXactBySubid(Oid subid)
String * makeString(char *str)
static WalReceiverConn * wrconn
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr