56 #define SUBOPT_CONNECT 0x00000001
57 #define SUBOPT_ENABLED 0x00000002
58 #define SUBOPT_CREATE_SLOT 0x00000004
59 #define SUBOPT_SLOT_NAME 0x00000008
60 #define SUBOPT_COPY_DATA 0x00000010
61 #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
62 #define SUBOPT_REFRESH 0x00000040
63 #define SUBOPT_BINARY 0x00000080
64 #define SUBOPT_STREAMING 0x00000100
65 #define SUBOPT_TWOPHASE_COMMIT 0x00000200
66 #define SUBOPT_DISABLE_ON_ERR 0x00000400
67 #define SUBOPT_LSN 0x00000800
68 #define SUBOPT_ORIGIN 0x00001000
71 #define IsSet(val, bits) (((val) & (bits)) == (bits))
97 List *publications,
bool copydata,
98 char *origin,
Oid *subrel_local_oids,
99 int subrel_count,
char *
subname);
121 Assert(supported_opts != 0);
130 opts->connect =
true;
132 opts->enabled =
true;
134 opts->create_slot =
true;
136 opts->copy_data =
true;
138 opts->refresh =
true;
140 opts->binary =
false;
144 opts->twophase =
false;
146 opts->disableonerr =
false;
151 foreach(lc, stmt_options)
156 strcmp(defel->
defname,
"connect") == 0)
165 strcmp(defel->
defname,
"enabled") == 0)
174 strcmp(defel->
defname,
"create_slot") == 0)
183 strcmp(defel->
defname,
"slot_name") == 0)
192 if (strcmp(
opts->slot_name,
"none") == 0)
193 opts->slot_name = NULL;
198 strcmp(defel->
defname,
"copy_data") == 0)
207 strcmp(defel->
defname,
"synchronous_commit") == 0)
221 strcmp(defel->
defname,
"refresh") == 0)
230 strcmp(defel->
defname,
"binary") == 0)
239 strcmp(defel->
defname,
"streaming") == 0)
247 else if (strcmp(defel->
defname,
"two_phase") == 0)
259 (
errcode(ERRCODE_SYNTAX_ERROR),
260 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
269 strcmp(defel->
defname,
"disable_on_error") == 0)
278 strcmp(defel->
defname,
"origin") == 0)
297 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
298 errmsg(
"unrecognized origin value: \"%s\"",
opts->origin));
301 strcmp(defel->
defname,
"lsn") == 0)
310 if (strcmp(lsn_str,
"none") == 0)
320 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
321 errmsg(
"invalid WAL location (LSN): %s", lsn_str)));
329 (
errcode(ERRCODE_SYNTAX_ERROR),
330 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
343 (
errcode(ERRCODE_SYNTAX_ERROR),
345 errmsg(
"%s and %s are mutually exclusive options",
346 "connect = false",
"enabled = true")));
348 if (
opts->create_slot &&
351 (
errcode(ERRCODE_SYNTAX_ERROR),
352 errmsg(
"%s and %s are mutually exclusive options",
353 "connect = false",
"create_slot = true")));
355 if (
opts->copy_data &&
358 (
errcode(ERRCODE_SYNTAX_ERROR),
359 errmsg(
"%s and %s are mutually exclusive options",
360 "connect = false",
"copy_data = true")));
363 opts->enabled =
false;
364 opts->create_slot =
false;
365 opts->copy_data =
false;
372 if (!
opts->slot_name &&
379 (
errcode(ERRCODE_SYNTAX_ERROR),
381 errmsg(
"%s and %s are mutually exclusive options",
382 "slot_name = NONE",
"enabled = true")));
385 (
errcode(ERRCODE_SYNTAX_ERROR),
387 errmsg(
"subscription with %s must also set %s",
388 "slot_name = NONE",
"enabled = false")));
391 if (
opts->create_slot)
395 (
errcode(ERRCODE_SYNTAX_ERROR),
397 errmsg(
"%s and %s are mutually exclusive options",
398 "slot_name = NONE",
"create_slot = true")));
401 (
errcode(ERRCODE_SYNTAX_ERROR),
403 errmsg(
"subscription with %s must also set %s",
404 "slot_name = NONE",
"create_slot = false")));
420 foreach(lc, publications)
450 Oid tableRow[1] = {TEXTOID};
454 " pg_catalog.pg_publication t WHERE\n"
465 errmsg(
"could not receive list of publications from the publisher: %s",
468 publicationsCopy =
list_copy(publications);
496 errcode(ERRCODE_UNDEFINED_OBJECT),
497 errmsg_plural(
"publication %s does not exist on the publisher",
498 "publications %s do not exist on the publisher",
517 "publicationListToArray to array",
544 bool nulls[Natts_pg_subscription];
572 if (
opts.create_slot)
577 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
578 errmsg(
"must be superuser to create subscriptions")));
584 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
585 if (strncmp(
stmt->subname,
"regress_", 8) != 0)
586 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
598 errmsg(
"subscription \"%s\" already exists",
603 opts.slot_name == NULL)
607 if (
opts.synchronous_commit == NULL)
608 opts.synchronous_commit =
"off";
610 conninfo =
stmt->conninfo;
611 publications =
stmt->publication;
621 memset(nulls,
false,
sizeof(nulls));
624 Anum_pg_subscription_oid);
628 values[Anum_pg_subscription_subname - 1] =
634 values[Anum_pg_subscription_subtwophasestate - 1] =
639 values[Anum_pg_subscription_subconninfo - 1] =
642 values[Anum_pg_subscription_subslotname - 1] =
645 nulls[Anum_pg_subscription_subslotname - 1] =
true;
646 values[Anum_pg_subscription_subsynccommit - 1] =
648 values[Anum_pg_subscription_subpublications - 1] =
650 values[Anum_pg_subscription_suborigin - 1] =
680 (
errcode(ERRCODE_CONNECTION_FAILURE),
681 errmsg(
"could not connect to the publisher: %s",
err)));
687 opts.origin, NULL, 0,
stmt->subname);
693 table_state =
opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
720 if (
opts.create_slot)
722 bool twophase_enabled =
false;
742 if (
opts.twophase && !
opts.copy_data && tables !=
NIL)
743 twophase_enabled =
true;
748 if (twophase_enabled)
752 (
errmsg(
"created replication slot \"%s\" on publisher",
764 (
errmsg(
"subscription was created, but is not connected"),
765 errhint(
"To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
783 List *validate_publications)
788 Oid *subrel_local_oids;
789 Oid *pubrel_local_oids;
795 typedef struct SubRemoveRels
800 SubRemoveRels *sub_remove_rels;
810 (
errcode(ERRCODE_CONNECTION_FAILURE),
811 errmsg(
"could not connect to the publisher: %s",
err)));
815 if (validate_publications)
830 subrel_local_oids =
palloc(subrel_count *
sizeof(
Oid));
832 foreach(lc, subrel_states)
836 subrel_local_oids[off++] = relstate->
relid;
838 qsort(subrel_local_oids, subrel_count,
842 sub->
origin, subrel_local_oids,
843 subrel_count, sub->
name);
849 sub_remove_rels =
palloc(subrel_count *
sizeof(SubRemoveRels));
861 foreach(lc, pubrel_names)
872 pubrel_local_oids[off++] = relid;
874 if (!bsearch(&relid, subrel_local_oids,
878 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
894 for (off = 0; off < subrel_count; off++)
896 Oid relid = subrel_local_oids[off];
898 if (!bsearch(&relid, pubrel_local_oids,
924 sub_remove_rels[remove_rel_len].relid = relid;
925 sub_remove_rels[remove_rel_len++].state =
state;
935 if (
state != SUBREL_STATE_READY)
967 for (off = 0; off < remove_rel_len; off++)
969 if (sub_remove_rels[off].
state != SUBREL_STATE_READY &&
970 sub_remove_rels[off].
state != SUBREL_STATE_SYNCDONE)
985 syncslotname,
sizeof(syncslotname));
1009 bool nulls[Natts_pg_subscription];
1010 bool replaces[Natts_pg_subscription];
1014 bool update_tuple =
false;
1028 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1029 errmsg(
"subscription \"%s\" does not exist",
1047 memset(nulls,
false,
sizeof(nulls));
1048 memset(replaces,
false,
sizeof(replaces));
1060 supported_opts, &
opts);
1073 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1074 errmsg(
"cannot set %s for enabled subscription",
1075 "slot_name = NONE")));
1078 values[Anum_pg_subscription_subslotname - 1] =
1081 nulls[Anum_pg_subscription_subslotname - 1] =
true;
1082 replaces[Anum_pg_subscription_subslotname - 1] =
true;
1085 if (
opts.synchronous_commit)
1087 values[Anum_pg_subscription_subsynccommit - 1] =
1089 replaces[Anum_pg_subscription_subsynccommit - 1] =
true;
1094 values[Anum_pg_subscription_subbinary - 1] =
1096 replaces[Anum_pg_subscription_subbinary - 1] =
true;
1101 values[Anum_pg_subscription_substream - 1] =
1103 replaces[Anum_pg_subscription_substream - 1] =
true;
1108 values[Anum_pg_subscription_subdisableonerr - 1]
1110 replaces[Anum_pg_subscription_subdisableonerr - 1]
1116 values[Anum_pg_subscription_suborigin - 1] =
1118 replaces[Anum_pg_subscription_suborigin - 1] =
true;
1121 update_tuple =
true;
1133 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1134 errmsg(
"cannot enable subscription that does not have a slot name")));
1136 values[Anum_pg_subscription_subenabled - 1] =
1138 replaces[Anum_pg_subscription_subenabled - 1] =
true;
1143 update_tuple =
true;
1153 values[Anum_pg_subscription_subconninfo - 1] =
1155 replaces[Anum_pg_subscription_subconninfo - 1] =
true;
1156 update_tuple =
true;
1163 supported_opts, &
opts);
1165 values[Anum_pg_subscription_subpublications - 1] =
1167 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1169 update_tuple =
true;
1176 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1177 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1178 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1186 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1187 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1188 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1210 supported_opts, &
opts);
1213 values[Anum_pg_subscription_subpublications - 1] =
1215 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1217 update_tuple =
true;
1223 List *validate_publications = (isadd) ?
stmt->publication : NULL;
1227 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1228 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1232 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1233 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1241 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1242 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1244 errhint(
"Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1246 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1247 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1255 validate_publications);
1265 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1266 errmsg(
"ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1290 (
errcode(ERRCODE_SYNTAX_ERROR),
1291 errmsg(
"ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1292 errhint(
"Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1310 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1311 errmsg(
"must be superuser to skip transaction")));
1324 originname,
sizeof(originname));
1331 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1332 errmsg(
"skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1338 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
1340 update_tuple =
true;
1345 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
1408 if (!
stmt->missing_ok)
1410 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1411 errmsg(
"subscription \"%s\" does not exist",
1415 (
errmsg(
"subscription \"%s\" does not exist, skipping",
1440 Anum_pg_subscription_subname, &isnull);
1446 Anum_pg_subscription_subconninfo, &isnull);
1452 Anum_pg_subscription_subslotname, &isnull);
1499 foreach(lc, subworkers)
1527 foreach(lc, rstates)
1544 sizeof(originname));
1562 if (!slotname && rstates ==
NIL)
1599 foreach(lc, rstates)
1620 if (rstate->
state != SUBREL_STATE_SYNCDONE)
1625 sizeof(syncslotname));
1683 (
errmsg(
"dropped replication slot \"%s\" on publisher",
1688 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1692 (
errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1693 slotname,
res->err)));
1699 (
errcode(ERRCODE_CONNECTION_FAILURE),
1700 errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1701 slotname,
res->err)));
1723 if (form->subowner == newOwnerId)
1733 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1734 errmsg(
"permission denied to change owner of subscription \"%s\"",
1736 errhint(
"The owner of a subscription must be a superuser.")));
1738 form->subowner = newOwnerId;
1773 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1774 errmsg(
"subscription \"%s\" does not exist",
name)));
1805 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1806 errmsg(
"subscription with OID %u does not exist", subid)));
1831 bool copydata,
char *origin,
Oid *subrel_local_oids,
1832 int subrel_count,
char *
subname)
1837 Oid tableRow[1] = {TEXTOID};
1841 if (!copydata || !origin ||
1847 "SELECT DISTINCT P.pubname AS pubname\n"
1848 "FROM pg_publication P,\n"
1849 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1850 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1851 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1852 "WHERE C.oid = GPT.relid AND P.pubname IN (");
1861 for (
i = 0;
i < subrel_count;
i++)
1863 Oid relid = subrel_local_oids[
i];
1867 appendStringInfo(&cmd,
"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
1868 schemaname, tablename);
1876 (
errcode(ERRCODE_CONNECTION_FAILURE),
1877 errmsg(
"could not receive list of replicated tables from the publisher: %s",
1912 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1913 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
1915 errdetail_plural(
"Subscribed publication %s is subscribing to other publications.",
1916 "Subscribed publications %s are subscribing to other publications.",
1918 errhint(
"Verify that initial data copied from the publisher tables did not come from other origins."));
1941 Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID};
1949 if (check_columnlist)
1953 " WHERE t.pubname IN (");
1962 (
errcode(ERRCODE_CONNECTION_FAILURE),
1963 errmsg(
"could not receive list of replicated tables from the publisher: %s",
1982 if (check_columnlist &&
list_member(tablelist, rv))
1984 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1985 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
1988 tablelist =
lappend(tablelist, rv);
2009 foreach(lc, rstates)
2022 if (rstate->
state != SUBREL_STATE_SYNCDONE)
2027 sizeof(syncslotname));
2028 elog(
WARNING,
"could not drop tablesync replication slot \"%s\"",
2034 (
errcode(ERRCODE_CONNECTION_FAILURE),
2035 errmsg(
"could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2038 errhint(
"Use %s to disassociate the subscription from the slot.",
2039 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2053 foreach(cell, publist)
2058 foreach(pcell, publist)
2065 if (strcmp(
name, pname) == 0)
2068 errmsg(
"publication name \"%s\" used more than once",
2096 foreach(lc, newpublist)
2102 foreach(lc2, oldpublist)
2106 if (strcmp(
name, pubname) == 0)
2112 errmsg(
"publication \"%s\" is already in subscription \"%s\"",
2121 if (addpub && !found)
2123 else if (!addpub && !found)
2125 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2126 errmsg(
"publication \"%s\" is not in subscription \"%s\"",
2136 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2137 errmsg(
"cannot drop all the publications from a subscription")));
2193 (
errcode(ERRCODE_SYNTAX_ERROR),
2194 errmsg(
"%s requires a Boolean value or \"parallel\"",
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
static void PGresult * res
elog(ERROR, "%s: %s", p2, msg)
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void load_file(const char *filename, bool restricted)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define DirectFunctionCall1(func, arg1)
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
if(TABLE==NULL||TABLE_index==NULL)
List * logicalrep_workers_find(Oid subid, bool only_running)
void logicalrep_worker_stop(Oid subid, Oid relid)
void ApplyLauncherWakeupAtCommit(void)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
List * list_copy(const List *oldlist)
List * list_append_unique(List *list, void *datum)
List * list_delete(List *list, void *datum)
void list_free(List *list)
bool list_member(const List *list, const void *datum)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
char * get_namespace_name(Oid nspid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_rel_name(Oid relid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
Datum namein(PG_FUNCTION_ARGS)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
static AmcheckOptions opts
static int list_length(const List *l)
#define foreach_delete_current(lst, cell)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
Subscription * GetSubscription(Oid subid, bool missing_ok)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_STREAM_ON
#define LOGICALREP_ORIGIN_ANY
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
static Datum PointerGetDatum(const void *X)
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
Datum quote_literal(PG_FUNCTION_ARGS)
#define RelationGetDescr(relation)
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, int elevel)
#define ERRCODE_DUPLICATE_OBJECT
StringInfo makeStringInfo(void)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
char * synchronous_commit
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
static void check_duplicates_in_publist(List *publist, Datum *datums)
static Datum publicationListToArray(List *publist)
static void get_publications_str(List *publications, StringInfo dest, bool quote_literal)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
String * makeString(char *str)
static WalReceiverConn * wrconn
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
#define walrcv_check_conninfo(conninfo)
#define walrcv_connect(conninfo, logical, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr