27#include "catalog/pg_authid_d.h"
28#include "catalog/pg_database_d.h"
60#define SUBOPT_CONNECT 0x00000001
61#define SUBOPT_ENABLED 0x00000002
62#define SUBOPT_CREATE_SLOT 0x00000004
63#define SUBOPT_SLOT_NAME 0x00000008
64#define SUBOPT_COPY_DATA 0x00000010
65#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66#define SUBOPT_REFRESH 0x00000040
67#define SUBOPT_BINARY 0x00000080
68#define SUBOPT_STREAMING 0x00000100
69#define SUBOPT_TWOPHASE_COMMIT 0x00000200
70#define SUBOPT_DISABLE_ON_ERR 0x00000400
71#define SUBOPT_PASSWORD_REQUIRED 0x00000800
72#define SUBOPT_RUN_AS_OWNER 0x00001000
73#define SUBOPT_FAILOVER 0x00002000
74#define SUBOPT_LSN 0x00004000
75#define SUBOPT_ORIGIN 0x00008000
78#define IsSet(val, bits) (((val) & (bits)) == (bits))
107 List *publications,
bool copydata,
108 char *origin,
Oid *subrel_local_oids,
109 int subrel_count,
char *
subname);
114 bool slot_needs_update,
bool isTopLevel);
133 Assert(supported_opts != 0);
142 opts->connect =
true;
144 opts->enabled =
true;
146 opts->create_slot =
true;
148 opts->copy_data =
true;
150 opts->refresh =
true;
152 opts->binary =
false;
154 opts->streaming = LOGICALREP_STREAM_PARALLEL;
156 opts->twophase =
false;
158 opts->disableonerr =
false;
160 opts->passwordrequired =
true;
162 opts->runasowner =
false;
164 opts->failover =
false;
169 foreach(lc, stmt_options)
174 strcmp(defel->
defname,
"connect") == 0)
183 strcmp(defel->
defname,
"enabled") == 0)
192 strcmp(defel->
defname,
"create_slot") == 0)
201 strcmp(defel->
defname,
"slot_name") == 0)
210 if (strcmp(
opts->slot_name,
"none") == 0)
211 opts->slot_name = NULL;
216 strcmp(defel->
defname,
"copy_data") == 0)
225 strcmp(defel->
defname,
"synchronous_commit") == 0)
239 strcmp(defel->
defname,
"refresh") == 0)
248 strcmp(defel->
defname,
"binary") == 0)
257 strcmp(defel->
defname,
"streaming") == 0)
266 strcmp(defel->
defname,
"two_phase") == 0)
275 strcmp(defel->
defname,
"disable_on_error") == 0)
284 strcmp(defel->
defname,
"password_required") == 0)
293 strcmp(defel->
defname,
"run_as_owner") == 0)
302 strcmp(defel->
defname,
"failover") == 0)
311 strcmp(defel->
defname,
"origin") == 0)
330 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
331 errmsg(
"unrecognized origin value: \"%s\"",
opts->origin));
334 strcmp(defel->
defname,
"lsn") == 0)
343 if (strcmp(lsn_str,
"none") == 0)
353 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
354 errmsg(
"invalid WAL location (LSN): %s", lsn_str)));
362 (
errcode(ERRCODE_SYNTAX_ERROR),
363 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
376 (
errcode(ERRCODE_SYNTAX_ERROR),
378 errmsg(
"%s and %s are mutually exclusive options",
379 "connect = false",
"enabled = true")));
381 if (
opts->create_slot &&
384 (
errcode(ERRCODE_SYNTAX_ERROR),
385 errmsg(
"%s and %s are mutually exclusive options",
386 "connect = false",
"create_slot = true")));
388 if (
opts->copy_data &&
391 (
errcode(ERRCODE_SYNTAX_ERROR),
392 errmsg(
"%s and %s are mutually exclusive options",
393 "connect = false",
"copy_data = true")));
396 opts->enabled =
false;
397 opts->create_slot =
false;
398 opts->copy_data =
false;
405 if (!
opts->slot_name &&
412 (
errcode(ERRCODE_SYNTAX_ERROR),
414 errmsg(
"%s and %s are mutually exclusive options",
415 "slot_name = NONE",
"enabled = true")));
418 (
errcode(ERRCODE_SYNTAX_ERROR),
420 errmsg(
"subscription with %s must also set %s",
421 "slot_name = NONE",
"enabled = false")));
424 if (
opts->create_slot)
428 (
errcode(ERRCODE_SYNTAX_ERROR),
430 errmsg(
"%s and %s are mutually exclusive options",
431 "slot_name = NONE",
"create_slot = true")));
434 (
errcode(ERRCODE_SYNTAX_ERROR),
436 errmsg(
"subscription with %s must also set %s",
437 "slot_name = NONE",
"create_slot = false")));
452 Oid tableRow[1] = {TEXTOID};
456 " pg_catalog.pg_publication t WHERE\n"
466 errmsg(
"could not receive list of publications from the publisher: %s",
469 publicationsCopy =
list_copy(publications);
497 errcode(ERRCODE_UNDEFINED_OBJECT),
498 errmsg_plural(
"publication %s does not exist on the publisher",
499 "publications %s do not exist on the publisher",
518 "publicationListToArray to array",
545 bool nulls[Natts_pg_subscription];
575 if (
opts.create_slot)
585 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586 errmsg(
"permission denied to create subscription"),
587 errdetail(
"Only roles with privileges of the \"%s\" role may create subscriptions.",
588 "pg_create_subscription")));
607 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
608 errmsg(
"password_required=false is superuser-only"),
609 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
615#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
616 if (strncmp(
stmt->subname,
"regress_", 8) != 0)
617 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
629 errmsg(
"subscription \"%s\" already exists",
634 opts.slot_name == NULL)
638 if (
opts.synchronous_commit == NULL)
639 opts.synchronous_commit =
"off";
641 conninfo =
stmt->conninfo;
642 publications =
stmt->publication;
652 memset(nulls,
false,
sizeof(nulls));
655 Anum_pg_subscription_oid);
659 values[Anum_pg_subscription_subname - 1] =
665 values[Anum_pg_subscription_subtwophasestate - 1] =
667 LOGICALREP_TWOPHASE_STATE_PENDING :
668 LOGICALREP_TWOPHASE_STATE_DISABLED);
673 values[Anum_pg_subscription_subconninfo - 1] =
676 values[Anum_pg_subscription_subslotname - 1] =
679 nulls[Anum_pg_subscription_subslotname - 1] =
true;
680 values[Anum_pg_subscription_subsynccommit - 1] =
682 values[Anum_pg_subscription_subpublications - 1] =
684 values[Anum_pg_subscription_suborigin - 1] =
709 bool must_use_password;
717 (
errcode(ERRCODE_CONNECTION_FAILURE),
718 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
725 opts.origin, NULL, 0,
stmt->subname);
731 table_state =
opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
758 if (
opts.create_slot)
760 bool twophase_enabled =
false;
780 if (
opts.twophase && !
opts.copy_data && tables !=
NIL)
781 twophase_enabled =
true;
786 if (twophase_enabled)
790 (
errmsg(
"created replication slot \"%s\" on publisher",
802 (
errmsg(
"subscription was created, but is not connected"),
803 errhint(
"To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
821 List *validate_publications)
826 Oid *subrel_local_oids;
827 Oid *pubrel_local_oids;
833 typedef struct SubRemoveRels
838 SubRemoveRels *sub_remove_rels;
840 bool must_use_password;
851 (
errcode(ERRCODE_CONNECTION_FAILURE),
852 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
857 if (validate_publications)
872 subrel_local_oids =
palloc(subrel_count *
sizeof(
Oid));
874 foreach(lc, subrel_states)
878 subrel_local_oids[off++] = relstate->
relid;
880 qsort(subrel_local_oids, subrel_count,
884 sub->
origin, subrel_local_oids,
885 subrel_count, sub->
name);
891 sub_remove_rels =
palloc(subrel_count *
sizeof(SubRemoveRels));
903 foreach(lc, pubrel_names)
914 pubrel_local_oids[off++] = relid;
916 if (!bsearch(&relid, subrel_local_oids,
920 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
936 for (off = 0; off < subrel_count; off++)
938 Oid relid = subrel_local_oids[off];
940 if (!bsearch(&relid, pubrel_local_oids,
966 sub_remove_rels[remove_rel_len].relid = relid;
967 sub_remove_rels[remove_rel_len++].state =
state;
977 if (
state != SUBREL_STATE_READY)
1009 for (off = 0; off < remove_rel_len; off++)
1011 if (sub_remove_rels[off].
state != SUBREL_STATE_READY &&
1012 sub_remove_rels[off].
state != SUBREL_STATE_SYNCDONE)
1027 syncslotname,
sizeof(syncslotname));
1047 bool slot_needs_update,
bool isTopLevel)
1054 strcmp(
option,
"two_phase") == 0);
1069 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1070 errmsg(
"cannot set option \"%s\" for enabled subscription",
1073 if (slot_needs_update)
1083 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1084 errmsg(
"cannot set option \"%s\" for a subscription that does not have a slot name",
1105 bool nulls[Natts_pg_subscription];
1106 bool replaces[Natts_pg_subscription];
1110 bool update_tuple =
false;
1111 bool update_failover =
false;
1112 bool update_two_phase =
false;
1126 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1127 errmsg(
"subscription \"%s\" does not exist",
1146 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1147 errmsg(
"password_required=false is superuser-only"),
1148 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1155 memset(nulls,
false,
sizeof(nulls));
1156 memset(replaces,
false,
sizeof(replaces));
1171 supported_opts, &
opts);
1184 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1185 errmsg(
"cannot set %s for enabled subscription",
1186 "slot_name = NONE")));
1189 values[Anum_pg_subscription_subslotname - 1] =
1192 nulls[Anum_pg_subscription_subslotname - 1] =
true;
1193 replaces[Anum_pg_subscription_subslotname - 1] =
true;
1196 if (
opts.synchronous_commit)
1198 values[Anum_pg_subscription_subsynccommit - 1] =
1200 replaces[Anum_pg_subscription_subsynccommit - 1] =
true;
1205 values[Anum_pg_subscription_subbinary - 1] =
1207 replaces[Anum_pg_subscription_subbinary - 1] =
true;
1212 values[Anum_pg_subscription_substream - 1] =
1214 replaces[Anum_pg_subscription_substream - 1] =
true;
1219 values[Anum_pg_subscription_subdisableonerr - 1]
1221 replaces[Anum_pg_subscription_subdisableonerr - 1]
1230 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1231 errmsg(
"password_required=false is superuser-only"),
1232 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1234 values[Anum_pg_subscription_subpasswordrequired - 1]
1236 replaces[Anum_pg_subscription_subpasswordrequired - 1]
1242 values[Anum_pg_subscription_subrunasowner - 1] =
1244 replaces[Anum_pg_subscription_subrunasowner - 1] =
true;
1256 update_two_phase = !
opts.twophase;
1266 if (update_two_phase &&
1269 (
errcode(ERRCODE_SYNTAX_ERROR),
1270 errmsg(
"slot_name and two_phase cannot be altered at the same time")));
1285 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1286 errmsg(
"cannot alter two_phase when logical replication worker is still running"),
1287 errhint(
"Try again after some time.")));
1295 if (update_two_phase &&
1299 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1300 errmsg(
"cannot disable two_phase when prepared transactions are present"),
1301 errhint(
"Resolve these transactions and try again.")));
1304 values[Anum_pg_subscription_subtwophasestate - 1] =
1306 LOGICALREP_TWOPHASE_STATE_PENDING :
1307 LOGICALREP_TWOPHASE_STATE_DISABLED);
1308 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1318 update_failover =
true;
1323 values[Anum_pg_subscription_subfailover - 1] =
1325 replaces[Anum_pg_subscription_subfailover - 1] =
true;
1330 values[Anum_pg_subscription_suborigin - 1] =
1332 replaces[Anum_pg_subscription_suborigin - 1] =
true;
1335 update_tuple =
true;
1347 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1348 errmsg(
"cannot enable subscription that does not have a slot name")));
1350 values[Anum_pg_subscription_subenabled - 1] =
1352 replaces[Anum_pg_subscription_subenabled - 1] =
true;
1357 update_tuple =
true;
1368 values[Anum_pg_subscription_subconninfo - 1] =
1370 replaces[Anum_pg_subscription_subconninfo - 1] =
true;
1371 update_tuple =
true;
1378 supported_opts, &
opts);
1380 values[Anum_pg_subscription_subpublications - 1] =
1382 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1384 update_tuple =
true;
1391 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1392 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1393 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1401 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1402 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1403 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1425 supported_opts, &
opts);
1428 values[Anum_pg_subscription_subpublications - 1] =
1430 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1432 update_tuple =
true;
1438 List *validate_publications = (isadd) ?
stmt->publication : NULL;
1442 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1443 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1447 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1448 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1456 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1457 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1459 errhint(
"Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1461 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1462 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1470 validate_publications);
1480 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1481 errmsg(
"ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1505 (
errcode(ERRCODE_SYNTAX_ERROR),
1506 errmsg(
"ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1507 errhint(
"Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1534 originname,
sizeof(originname));
1541 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1542 errmsg(
"skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1548 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
1550 update_tuple =
true;
1555 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
1578 if (update_failover || update_two_phase)
1580 bool must_use_password;
1593 (
errcode(ERRCODE_CONNECTION_FAILURE),
1594 errmsg(
"subscription \"%s\" could not connect to the publisher: %s",
1600 update_failover ? &
opts.failover : NULL,
1601 update_two_phase ? &
opts.twophase : NULL);
1645 bool must_use_password;
1660 if (!
stmt->missing_ok)
1662 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1663 errmsg(
"subscription \"%s\" does not exist",
1667 (
errmsg(
"subscription \"%s\" does not exist, skipping",
1675 subowner = form->subowner;
1676 must_use_password = !
superuser_arg(subowner) && form->subpasswordrequired;
1694 Anum_pg_subscription_subname);
1699 Anum_pg_subscription_subconninfo);
1704 Anum_pg_subscription_subslotname, &isnull);
1749 foreach(lc, subworkers)
1777 foreach(lc, rstates)
1794 sizeof(originname));
1818 if (!slotname && rstates ==
NIL)
1856 foreach(lc, rstates)
1877 if (rstate->
state != SUBREL_STATE_SYNCDONE)
1882 sizeof(syncslotname));
1934 (
errmsg(
"dropped replication slot \"%s\" on publisher",
1939 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1943 (
errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1944 slotname,
res->err)));
1950 (
errcode(ERRCODE_CONNECTION_FAILURE),
1951 errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1952 slotname,
res->err)));
1975 if (form->subowner == newOwnerId)
1986 if (!form->subpasswordrequired && !
superuser())
1988 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1989 errmsg(
"password_required=false is superuser-only"),
1990 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2008 form->subowner = newOwnerId;
2043 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2044 errmsg(
"subscription \"%s\" does not exist",
name)));
2075 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2076 errmsg(
"subscription with OID %u does not exist", subid)));
2101 bool copydata,
char *origin,
Oid *subrel_local_oids,
2102 int subrel_count,
char *
subname)
2107 Oid tableRow[1] = {TEXTOID};
2111 if (!copydata || !origin ||
2117 "SELECT DISTINCT P.pubname AS pubname\n"
2118 "FROM pg_publication P,\n"
2119 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2120 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
2121 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2122 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2131 for (
i = 0;
i < subrel_count;
i++)
2133 Oid relid = subrel_local_oids[
i];
2137 appendStringInfo(&cmd,
"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2138 schemaname, tablename);
2146 (
errcode(ERRCODE_CONNECTION_FAILURE),
2147 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2182 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2183 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2185 errdetail_plural(
"The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2186 "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2188 errhint(
"Verify that initial data copied from the publisher tables did not come from other origins."));
2225 tableRow[2] = INT2VECTOROID;
2241 " FROM pg_class c\n"
2242 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2243 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2244 " FROM pg_publication\n"
2245 " WHERE pubname IN ( %s )) AS gpt\n"
2246 " ON gpt.relid = c.oid\n",
2251 tableRow[2] = NAMEARRAYOID;
2255 if (check_columnlist)
2259 " WHERE t.pubname IN ( %s )",
2270 (
errcode(ERRCODE_CONNECTION_FAILURE),
2271 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2290 if (check_columnlist &&
list_member(tablelist, rv))
2292 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2293 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
2296 tablelist =
lappend(tablelist, rv);
2317 foreach(lc, rstates)
2330 if (rstate->
state != SUBREL_STATE_SYNCDONE)
2335 sizeof(syncslotname));
2336 elog(
WARNING,
"could not drop tablesync replication slot \"%s\"",
2342 (
errcode(ERRCODE_CONNECTION_FAILURE),
2343 errmsg(
"could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2346 errhint(
"Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2347 "ALTER SUBSCRIPTION ... DISABLE",
2348 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2362 foreach(cell, publist)
2367 foreach(pcell, publist)
2374 if (strcmp(
name, pname) == 0)
2377 errmsg(
"publication name \"%s\" used more than once",
2405 foreach(lc, newpublist)
2411 foreach(lc2, oldpublist)
2415 if (strcmp(
name, pubname) == 0)
2421 errmsg(
"publication \"%s\" is already in subscription \"%s\"",
2430 if (addpub && !found)
2432 else if (!addpub && !found)
2434 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2435 errmsg(
"publication \"%s\" is not in subscription \"%s\"",
2445 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2446 errmsg(
"cannot drop all the publications from a subscription")));
2462 return LOGICALREP_STREAM_ON;
2473 return LOGICALREP_STREAM_OFF;
2475 return LOGICALREP_STREAM_ON;
2491 return LOGICALREP_STREAM_OFF;
2494 return LOGICALREP_STREAM_ON;
2496 return LOGICALREP_STREAM_PARALLEL;
2502 (
errcode(ERRCODE_SYNTAX_ERROR),
2503 errmsg(
"%s requires a Boolean value or \"parallel\"",
2505 return LOGICALREP_STREAM_OFF;
bool has_privs_of_role(Oid member, Oid role)
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define TextDatumGetCString(d)
#define Assert(condition)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
char * get_database_name(Oid dbid)
static void PGresult * res
char * defGetString(DefElem *def)
bool defGetBoolean(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void load_file(const char *filename, bool restricted)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
#define DirectFunctionCall1(func, arg1)
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
if(TABLE==NULL||TABLE_index==NULL)
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
void logicalrep_worker_stop(Oid subid, Oid relid)
void ApplyLauncherWakeupAtCommit(void)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
List * list_delete(List *list, void *datum)
List * list_append_unique(List *list, void *datum)
List * list_copy(const List *oldlist)
void list_free(List *list)
bool list_member(const List *list, const void *datum)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
char * get_rel_name(Oid relid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_namespace_name(Oid nspid)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
Datum namein(PG_FUNCTION_ARGS)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
static AmcheckOptions opts
static int server_version
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
Subscription * GetSubscription(Oid subid, bool missing_ok)
FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
static Datum PointerGetDatum(const void *X)
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
static Datum CharGetDatum(char X)
MemoryContextSwitchTo(old_ctx)
#define RelationGetDescr(relation)
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, int elevel)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
char * synchronous_commit
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
static Datum publicationListToArray(List *publist)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
bool LookupGXactBySubid(Oid subid)
String * makeString(char *str)
static WalReceiverConn * wrconn
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr