27 #include "catalog/pg_authid_d.h"
28 #include "catalog/pg_database_d.h"
60 #define SUBOPT_CONNECT 0x00000001
61 #define SUBOPT_ENABLED 0x00000002
62 #define SUBOPT_CREATE_SLOT 0x00000004
63 #define SUBOPT_SLOT_NAME 0x00000008
64 #define SUBOPT_COPY_DATA 0x00000010
65 #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66 #define SUBOPT_REFRESH 0x00000040
67 #define SUBOPT_BINARY 0x00000080
68 #define SUBOPT_STREAMING 0x00000100
69 #define SUBOPT_TWOPHASE_COMMIT 0x00000200
70 #define SUBOPT_DISABLE_ON_ERR 0x00000400
71 #define SUBOPT_PASSWORD_REQUIRED 0x00000800
72 #define SUBOPT_RUN_AS_OWNER 0x00001000
73 #define SUBOPT_FAILOVER 0x00002000
74 #define SUBOPT_LSN 0x00004000
75 #define SUBOPT_ORIGIN 0x00008000
78 #define IsSet(val, bits) (((val) & (bits)) == (bits))
107 List *publications,
bool copydata,
108 char *origin,
Oid *subrel_local_oids,
109 int subrel_count,
char *
subname);
114 bool slot_needs_update,
bool isTopLevel);
133 Assert(supported_opts != 0);
142 opts->connect =
true;
144 opts->enabled =
true;
146 opts->create_slot =
true;
148 opts->copy_data =
true;
150 opts->refresh =
true;
152 opts->binary =
false;
156 opts->twophase =
false;
158 opts->disableonerr =
false;
160 opts->passwordrequired =
true;
162 opts->runasowner =
false;
164 opts->failover =
false;
169 foreach(lc, stmt_options)
174 strcmp(defel->
defname,
"connect") == 0)
183 strcmp(defel->
defname,
"enabled") == 0)
192 strcmp(defel->
defname,
"create_slot") == 0)
201 strcmp(defel->
defname,
"slot_name") == 0)
210 if (strcmp(
opts->slot_name,
"none") == 0)
211 opts->slot_name = NULL;
216 strcmp(defel->
defname,
"copy_data") == 0)
225 strcmp(defel->
defname,
"synchronous_commit") == 0)
239 strcmp(defel->
defname,
"refresh") == 0)
248 strcmp(defel->
defname,
"binary") == 0)
257 strcmp(defel->
defname,
"streaming") == 0)
266 strcmp(defel->
defname,
"two_phase") == 0)
275 strcmp(defel->
defname,
"disable_on_error") == 0)
284 strcmp(defel->
defname,
"password_required") == 0)
293 strcmp(defel->
defname,
"run_as_owner") == 0)
302 strcmp(defel->
defname,
"failover") == 0)
311 strcmp(defel->
defname,
"origin") == 0)
330 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
331 errmsg(
"unrecognized origin value: \"%s\"",
opts->origin));
334 strcmp(defel->
defname,
"lsn") == 0)
343 if (strcmp(lsn_str,
"none") == 0)
353 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
354 errmsg(
"invalid WAL location (LSN): %s", lsn_str)));
362 (
errcode(ERRCODE_SYNTAX_ERROR),
363 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
376 (
errcode(ERRCODE_SYNTAX_ERROR),
378 errmsg(
"%s and %s are mutually exclusive options",
379 "connect = false",
"enabled = true")));
381 if (
opts->create_slot &&
384 (
errcode(ERRCODE_SYNTAX_ERROR),
385 errmsg(
"%s and %s are mutually exclusive options",
386 "connect = false",
"create_slot = true")));
388 if (
opts->copy_data &&
391 (
errcode(ERRCODE_SYNTAX_ERROR),
392 errmsg(
"%s and %s are mutually exclusive options",
393 "connect = false",
"copy_data = true")));
396 opts->enabled =
false;
397 opts->create_slot =
false;
398 opts->copy_data =
false;
405 if (!
opts->slot_name &&
412 (
errcode(ERRCODE_SYNTAX_ERROR),
414 errmsg(
"%s and %s are mutually exclusive options",
415 "slot_name = NONE",
"enabled = true")));
418 (
errcode(ERRCODE_SYNTAX_ERROR),
420 errmsg(
"subscription with %s must also set %s",
421 "slot_name = NONE",
"enabled = false")));
424 if (
opts->create_slot)
428 (
errcode(ERRCODE_SYNTAX_ERROR),
430 errmsg(
"%s and %s are mutually exclusive options",
431 "slot_name = NONE",
"create_slot = true")));
434 (
errcode(ERRCODE_SYNTAX_ERROR),
436 errmsg(
"subscription with %s must also set %s",
437 "slot_name = NONE",
"create_slot = false")));
453 foreach(lc, publications)
483 Oid tableRow[1] = {TEXTOID};
487 " pg_catalog.pg_publication t WHERE\n"
497 errmsg(
"could not receive list of publications from the publisher: %s",
500 publicationsCopy =
list_copy(publications);
528 errcode(ERRCODE_UNDEFINED_OBJECT),
529 errmsg_plural(
"publication %s does not exist on the publisher",
530 "publications %s do not exist on the publisher",
549 "publicationListToArray to array",
576 bool nulls[Natts_pg_subscription];
606 if (
opts.create_slot)
616 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
617 errmsg(
"permission denied to create subscription"),
618 errdetail(
"Only roles with privileges of the \"%s\" role may create subscriptions.",
619 "pg_create_subscription")));
638 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
639 errmsg(
"password_required=false is superuser-only"),
640 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
646 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
647 if (strncmp(
stmt->subname,
"regress_", 8) != 0)
648 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
660 errmsg(
"subscription \"%s\" already exists",
665 opts.slot_name == NULL)
669 if (
opts.synchronous_commit == NULL)
670 opts.synchronous_commit =
"off";
672 conninfo =
stmt->conninfo;
673 publications =
stmt->publication;
683 memset(nulls,
false,
sizeof(nulls));
686 Anum_pg_subscription_oid);
690 values[Anum_pg_subscription_subname - 1] =
696 values[Anum_pg_subscription_subtwophasestate - 1] =
704 values[Anum_pg_subscription_subconninfo - 1] =
707 values[Anum_pg_subscription_subslotname - 1] =
710 nulls[Anum_pg_subscription_subslotname - 1] =
true;
711 values[Anum_pg_subscription_subsynccommit - 1] =
713 values[Anum_pg_subscription_subpublications - 1] =
715 values[Anum_pg_subscription_suborigin - 1] =
740 bool must_use_password;
748 (
errcode(ERRCODE_CONNECTION_FAILURE),
749 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
756 opts.origin, NULL, 0,
stmt->subname);
762 table_state =
opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
789 if (
opts.create_slot)
791 bool twophase_enabled =
false;
811 if (
opts.twophase && !
opts.copy_data && tables !=
NIL)
812 twophase_enabled =
true;
817 if (twophase_enabled)
821 (
errmsg(
"created replication slot \"%s\" on publisher",
833 (
errmsg(
"subscription was created, but is not connected"),
834 errhint(
"To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
852 List *validate_publications)
857 Oid *subrel_local_oids;
858 Oid *pubrel_local_oids;
864 typedef struct SubRemoveRels
869 SubRemoveRels *sub_remove_rels;
871 bool must_use_password;
882 (
errcode(ERRCODE_CONNECTION_FAILURE),
883 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
888 if (validate_publications)
903 subrel_local_oids =
palloc(subrel_count *
sizeof(
Oid));
905 foreach(lc, subrel_states)
909 subrel_local_oids[off++] = relstate->
relid;
911 qsort(subrel_local_oids, subrel_count,
915 sub->
origin, subrel_local_oids,
916 subrel_count, sub->
name);
922 sub_remove_rels =
palloc(subrel_count *
sizeof(SubRemoveRels));
934 foreach(lc, pubrel_names)
945 pubrel_local_oids[off++] = relid;
947 if (!bsearch(&relid, subrel_local_oids,
951 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
967 for (off = 0; off < subrel_count; off++)
969 Oid relid = subrel_local_oids[off];
971 if (!bsearch(&relid, pubrel_local_oids,
997 sub_remove_rels[remove_rel_len].relid = relid;
998 sub_remove_rels[remove_rel_len++].state =
state;
1008 if (
state != SUBREL_STATE_READY)
1023 sizeof(originname));
1040 for (off = 0; off < remove_rel_len; off++)
1042 if (sub_remove_rels[off].
state != SUBREL_STATE_READY &&
1043 sub_remove_rels[off].
state != SUBREL_STATE_SYNCDONE)
1058 syncslotname,
sizeof(syncslotname));
1078 bool slot_needs_update,
bool isTopLevel)
1085 strcmp(
option,
"two_phase") == 0);
1100 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1101 errmsg(
"cannot set option \"%s\" for enabled subscription",
1104 if (slot_needs_update)
1114 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1115 errmsg(
"cannot set option \"%s\" for a subscription that does not have a slot name",
1136 bool nulls[Natts_pg_subscription];
1137 bool replaces[Natts_pg_subscription];
1141 bool update_tuple =
false;
1142 bool update_failover =
false;
1143 bool update_two_phase =
false;
1157 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1158 errmsg(
"subscription \"%s\" does not exist",
1177 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1178 errmsg(
"password_required=false is superuser-only"),
1179 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1186 memset(nulls,
false,
sizeof(nulls));
1187 memset(replaces,
false,
sizeof(replaces));
1202 supported_opts, &
opts);
1215 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1216 errmsg(
"cannot set %s for enabled subscription",
1217 "slot_name = NONE")));
1220 values[Anum_pg_subscription_subslotname - 1] =
1223 nulls[Anum_pg_subscription_subslotname - 1] =
true;
1224 replaces[Anum_pg_subscription_subslotname - 1] =
true;
1227 if (
opts.synchronous_commit)
1229 values[Anum_pg_subscription_subsynccommit - 1] =
1231 replaces[Anum_pg_subscription_subsynccommit - 1] =
true;
1236 values[Anum_pg_subscription_subbinary - 1] =
1238 replaces[Anum_pg_subscription_subbinary - 1] =
true;
1243 values[Anum_pg_subscription_substream - 1] =
1245 replaces[Anum_pg_subscription_substream - 1] =
true;
1250 values[Anum_pg_subscription_subdisableonerr - 1]
1252 replaces[Anum_pg_subscription_subdisableonerr - 1]
1261 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1262 errmsg(
"password_required=false is superuser-only"),
1263 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1265 values[Anum_pg_subscription_subpasswordrequired - 1]
1267 replaces[Anum_pg_subscription_subpasswordrequired - 1]
1273 values[Anum_pg_subscription_subrunasowner - 1] =
1275 replaces[Anum_pg_subscription_subrunasowner - 1] =
true;
1287 update_two_phase = !
opts.twophase;
1297 if (update_two_phase &&
1300 (
errcode(ERRCODE_SYNTAX_ERROR),
1301 errmsg(
"slot_name and two_phase cannot be altered at the same time")));
1316 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1317 errmsg(
"cannot alter two_phase when logical replication worker is still running"),
1318 errhint(
"Try again after some time.")));
1326 if (update_two_phase &&
1330 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1331 errmsg(
"cannot disable two_phase when prepared transactions are present"),
1332 errhint(
"Resolve these transactions and try again.")));
1335 values[Anum_pg_subscription_subtwophasestate - 1] =
1339 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1349 update_failover =
true;
1354 values[Anum_pg_subscription_subfailover - 1] =
1356 replaces[Anum_pg_subscription_subfailover - 1] =
true;
1361 values[Anum_pg_subscription_suborigin - 1] =
1363 replaces[Anum_pg_subscription_suborigin - 1] =
true;
1366 update_tuple =
true;
1378 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1379 errmsg(
"cannot enable subscription that does not have a slot name")));
1381 values[Anum_pg_subscription_subenabled - 1] =
1383 replaces[Anum_pg_subscription_subenabled - 1] =
true;
1388 update_tuple =
true;
1399 values[Anum_pg_subscription_subconninfo - 1] =
1401 replaces[Anum_pg_subscription_subconninfo - 1] =
true;
1402 update_tuple =
true;
1409 supported_opts, &
opts);
1411 values[Anum_pg_subscription_subpublications - 1] =
1413 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1415 update_tuple =
true;
1422 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1423 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1424 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1432 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1433 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1434 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1456 supported_opts, &
opts);
1459 values[Anum_pg_subscription_subpublications - 1] =
1461 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1463 update_tuple =
true;
1469 List *validate_publications = (isadd) ?
stmt->publication : NULL;
1473 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1474 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1478 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1479 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1487 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1488 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1490 errhint(
"Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1492 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1493 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1501 validate_publications);
1511 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1512 errmsg(
"ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1536 (
errcode(ERRCODE_SYNTAX_ERROR),
1537 errmsg(
"ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1538 errhint(
"Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1565 originname,
sizeof(originname));
1572 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1573 errmsg(
"skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1579 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
1581 update_tuple =
true;
1586 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
1609 if (update_failover || update_two_phase)
1611 bool must_use_password;
1624 (
errcode(ERRCODE_CONNECTION_FAILURE),
1625 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
1631 update_failover ? &
opts.failover : NULL,
1632 update_two_phase ? &
opts.twophase : NULL);
1676 bool must_use_password;
1691 if (!
stmt->missing_ok)
1693 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1694 errmsg(
"subscription \"%s\" does not exist",
1698 (
errmsg(
"subscription \"%s\" does not exist, skipping",
1706 subowner = form->subowner;
1707 must_use_password = !
superuser_arg(subowner) && form->subpasswordrequired;
1725 Anum_pg_subscription_subname);
1730 Anum_pg_subscription_subconninfo);
1735 Anum_pg_subscription_subslotname, &isnull);
1780 foreach(lc, subworkers)
1808 foreach(lc, rstates)
1825 sizeof(originname));
1849 if (!slotname && rstates ==
NIL)
1887 foreach(lc, rstates)
1908 if (rstate->
state != SUBREL_STATE_SYNCDONE)
1913 sizeof(syncslotname));
1965 (
errmsg(
"dropped replication slot \"%s\" on publisher",
1970 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1974 (
errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1975 slotname,
res->err)));
1981 (
errcode(ERRCODE_CONNECTION_FAILURE),
1982 errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1983 slotname,
res->err)));
2006 if (form->subowner == newOwnerId)
2017 if (!form->subpasswordrequired && !
superuser())
2019 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2020 errmsg(
"password_required=false is superuser-only"),
2021 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2039 form->subowner = newOwnerId;
2074 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2075 errmsg(
"subscription \"%s\" does not exist",
name)));
2106 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2107 errmsg(
"subscription with OID %u does not exist", subid)));
2132 bool copydata,
char *origin,
Oid *subrel_local_oids,
2133 int subrel_count,
char *
subname)
2138 Oid tableRow[1] = {TEXTOID};
2142 if (!copydata || !origin ||
2148 "SELECT DISTINCT P.pubname AS pubname\n"
2149 "FROM pg_publication P,\n"
2150 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2151 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
2152 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2153 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2162 for (
i = 0;
i < subrel_count;
i++)
2164 Oid relid = subrel_local_oids[
i];
2168 appendStringInfo(&cmd,
"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2169 schemaname, tablename);
2177 (
errcode(ERRCODE_CONNECTION_FAILURE),
2178 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2213 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2214 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2216 errdetail_plural(
"The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2217 "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2219 errhint(
"Verify that initial data copied from the publisher tables did not come from other origins."));
2254 tableRow[2] = INT2VECTOROID;
2272 " FROM pg_class c\n"
2273 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2274 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2275 " FROM pg_publication\n"
2276 " WHERE pubname IN ( %s )) AS gpt\n"
2277 " ON gpt.relid = c.oid\n",
2284 tableRow[2] = NAMEARRAYOID;
2288 if (check_columnlist)
2292 " WHERE t.pubname IN (");
2302 (
errcode(ERRCODE_CONNECTION_FAILURE),
2303 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2322 if (check_columnlist &&
list_member(tablelist, rv))
2324 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2325 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
2328 tablelist =
lappend(tablelist, rv);
2349 foreach(lc, rstates)
2362 if (rstate->
state != SUBREL_STATE_SYNCDONE)
2367 sizeof(syncslotname));
2368 elog(
WARNING,
"could not drop tablesync replication slot \"%s\"",
2374 (
errcode(ERRCODE_CONNECTION_FAILURE),
2375 errmsg(
"could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2378 errhint(
"Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2379 "ALTER SUBSCRIPTION ... DISABLE",
2380 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2394 foreach(cell, publist)
2399 foreach(pcell, publist)
2406 if (strcmp(
name, pname) == 0)
2409 errmsg(
"publication name \"%s\" used more than once",
2437 foreach(lc, newpublist)
2443 foreach(lc2, oldpublist)
2447 if (strcmp(
name, pubname) == 0)
2453 errmsg(
"publication \"%s\" is already in subscription \"%s\"",
2462 if (addpub && !found)
2464 else if (!addpub && !found)
2466 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2467 errmsg(
"publication \"%s\" is not in subscription \"%s\"",
2477 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2478 errmsg(
"cannot drop all the publications from a subscription")));
2534 (
errcode(ERRCODE_SYNTAX_ERROR),
2535 errmsg(
"%s requires a Boolean value or \"parallel\"",
bool has_privs_of_role(Oid member, Oid role)
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define TextDatumGetCString(d)
#define Assert(condition)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
char * get_database_name(Oid dbid)
static void PGresult * res
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void load_file(const char *filename, bool restricted)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define DirectFunctionCall1(func, arg1)
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
if(TABLE==NULL||TABLE_index==NULL)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void logicalrep_worker_stop(Oid subid, Oid relid)
void ApplyLauncherWakeupAtCommit(void)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
List * list_copy(const List *oldlist)
List * list_append_unique(List *list, void *datum)
List * list_delete(List *list, void *datum)
void list_free(List *list)
bool list_member(const List *list, const void *datum)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
char * get_namespace_name(Oid nspid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_rel_name(Oid relid)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
Datum namein(PG_FUNCTION_ARGS)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
static AmcheckOptions opts
static int server_version
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
Subscription * GetSubscription(Oid subid, bool missing_ok)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_STREAM_ON
#define LOGICALREP_ORIGIN_ANY
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
static Datum PointerGetDatum(const void *X)
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
Datum quote_literal(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(old_ctx)
#define RelationGetDescr(relation)
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, int elevel)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
char * synchronous_commit
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
static Datum publicationListToArray(List *publist)
static void get_publications_str(List *publications, StringInfo dest, bool quote_literal)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
bool LookupGXactBySubid(Oid subid)
String * makeString(char *str)
static WalReceiverConn * wrconn
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr