26 #include "catalog/pg_authid_d.h"
27 #include "catalog/pg_database_d.h"
59 #define SUBOPT_CONNECT 0x00000001
60 #define SUBOPT_ENABLED 0x00000002
61 #define SUBOPT_CREATE_SLOT 0x00000004
62 #define SUBOPT_SLOT_NAME 0x00000008
63 #define SUBOPT_COPY_DATA 0x00000010
64 #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
65 #define SUBOPT_REFRESH 0x00000040
66 #define SUBOPT_BINARY 0x00000080
67 #define SUBOPT_STREAMING 0x00000100
68 #define SUBOPT_TWOPHASE_COMMIT 0x00000200
69 #define SUBOPT_DISABLE_ON_ERR 0x00000400
70 #define SUBOPT_PASSWORD_REQUIRED 0x00000800
71 #define SUBOPT_RUN_AS_OWNER 0x00001000
72 #define SUBOPT_FAILOVER 0x00002000
73 #define SUBOPT_LSN 0x00004000
74 #define SUBOPT_ORIGIN 0x00008000
77 #define IsSet(val, bits) (((val) & (bits)) == (bits))
106 List *publications,
bool copydata,
107 char *origin,
Oid *subrel_local_oids,
108 int subrel_count,
char *
subname);
130 Assert(supported_opts != 0);
139 opts->connect =
true;
141 opts->enabled =
true;
143 opts->create_slot =
true;
145 opts->copy_data =
true;
147 opts->refresh =
true;
149 opts->binary =
false;
153 opts->twophase =
false;
155 opts->disableonerr =
false;
157 opts->passwordrequired =
true;
159 opts->runasowner =
false;
161 opts->failover =
false;
166 foreach(lc, stmt_options)
171 strcmp(defel->
defname,
"connect") == 0)
180 strcmp(defel->
defname,
"enabled") == 0)
189 strcmp(defel->
defname,
"create_slot") == 0)
198 strcmp(defel->
defname,
"slot_name") == 0)
207 if (strcmp(
opts->slot_name,
"none") == 0)
208 opts->slot_name = NULL;
213 strcmp(defel->
defname,
"copy_data") == 0)
222 strcmp(defel->
defname,
"synchronous_commit") == 0)
236 strcmp(defel->
defname,
"refresh") == 0)
245 strcmp(defel->
defname,
"binary") == 0)
254 strcmp(defel->
defname,
"streaming") == 0)
262 else if (strcmp(defel->
defname,
"two_phase") == 0)
274 (
errcode(ERRCODE_SYNTAX_ERROR),
275 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
284 strcmp(defel->
defname,
"disable_on_error") == 0)
293 strcmp(defel->
defname,
"password_required") == 0)
302 strcmp(defel->
defname,
"run_as_owner") == 0)
311 strcmp(defel->
defname,
"failover") == 0)
320 strcmp(defel->
defname,
"origin") == 0)
339 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
340 errmsg(
"unrecognized origin value: \"%s\"",
opts->origin));
343 strcmp(defel->
defname,
"lsn") == 0)
352 if (strcmp(lsn_str,
"none") == 0)
362 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
363 errmsg(
"invalid WAL location (LSN): %s", lsn_str)));
371 (
errcode(ERRCODE_SYNTAX_ERROR),
372 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
385 (
errcode(ERRCODE_SYNTAX_ERROR),
387 errmsg(
"%s and %s are mutually exclusive options",
388 "connect = false",
"enabled = true")));
390 if (
opts->create_slot &&
393 (
errcode(ERRCODE_SYNTAX_ERROR),
394 errmsg(
"%s and %s are mutually exclusive options",
395 "connect = false",
"create_slot = true")));
397 if (
opts->copy_data &&
400 (
errcode(ERRCODE_SYNTAX_ERROR),
401 errmsg(
"%s and %s are mutually exclusive options",
402 "connect = false",
"copy_data = true")));
405 opts->enabled =
false;
406 opts->create_slot =
false;
407 opts->copy_data =
false;
414 if (!
opts->slot_name &&
421 (
errcode(ERRCODE_SYNTAX_ERROR),
423 errmsg(
"%s and %s are mutually exclusive options",
424 "slot_name = NONE",
"enabled = true")));
427 (
errcode(ERRCODE_SYNTAX_ERROR),
429 errmsg(
"subscription with %s must also set %s",
430 "slot_name = NONE",
"enabled = false")));
433 if (
opts->create_slot)
437 (
errcode(ERRCODE_SYNTAX_ERROR),
439 errmsg(
"%s and %s are mutually exclusive options",
440 "slot_name = NONE",
"create_slot = true")));
443 (
errcode(ERRCODE_SYNTAX_ERROR),
445 errmsg(
"subscription with %s must also set %s",
446 "slot_name = NONE",
"create_slot = false")));
462 foreach(lc, publications)
492 Oid tableRow[1] = {TEXTOID};
496 " pg_catalog.pg_publication t WHERE\n"
506 errmsg(
"could not receive list of publications from the publisher: %s",
509 publicationsCopy =
list_copy(publications);
537 errcode(ERRCODE_UNDEFINED_OBJECT),
538 errmsg_plural(
"publication %s does not exist on the publisher",
539 "publications %s do not exist on the publisher",
558 "publicationListToArray to array",
585 bool nulls[Natts_pg_subscription];
615 if (
opts.create_slot)
625 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
626 errmsg(
"permission denied to create subscription"),
627 errdetail(
"Only roles with privileges of the \"%s\" role may create subscriptions.",
628 "pg_create_subscription")));
647 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
648 errmsg(
"password_required=false is superuser-only"),
649 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
655 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
656 if (strncmp(
stmt->subname,
"regress_", 8) != 0)
657 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
669 errmsg(
"subscription \"%s\" already exists",
674 opts.slot_name == NULL)
678 if (
opts.synchronous_commit == NULL)
679 opts.synchronous_commit =
"off";
681 conninfo =
stmt->conninfo;
682 publications =
stmt->publication;
692 memset(nulls,
false,
sizeof(nulls));
695 Anum_pg_subscription_oid);
699 values[Anum_pg_subscription_subname - 1] =
705 values[Anum_pg_subscription_subtwophasestate - 1] =
713 values[Anum_pg_subscription_subconninfo - 1] =
716 values[Anum_pg_subscription_subslotname - 1] =
719 nulls[Anum_pg_subscription_subslotname - 1] =
true;
720 values[Anum_pg_subscription_subsynccommit - 1] =
722 values[Anum_pg_subscription_subpublications - 1] =
724 values[Anum_pg_subscription_suborigin - 1] =
749 bool must_use_password;
757 (
errcode(ERRCODE_CONNECTION_FAILURE),
758 errmsg(
"could not connect to the publisher: %s",
err)));
764 opts.origin, NULL, 0,
stmt->subname);
770 table_state =
opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
797 if (
opts.create_slot)
799 bool twophase_enabled =
false;
819 if (
opts.twophase && !
opts.copy_data && tables !=
NIL)
820 twophase_enabled =
true;
825 if (twophase_enabled)
829 (
errmsg(
"created replication slot \"%s\" on publisher",
841 (
errmsg(
"subscription was created, but is not connected"),
842 errhint(
"To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
860 List *validate_publications)
865 Oid *subrel_local_oids;
866 Oid *pubrel_local_oids;
872 typedef struct SubRemoveRels
877 SubRemoveRels *sub_remove_rels;
879 bool must_use_password;
890 (
errcode(ERRCODE_CONNECTION_FAILURE),
891 errmsg(
"could not connect to the publisher: %s",
err)));
895 if (validate_publications)
910 subrel_local_oids =
palloc(subrel_count *
sizeof(
Oid));
912 foreach(lc, subrel_states)
916 subrel_local_oids[off++] = relstate->
relid;
918 qsort(subrel_local_oids, subrel_count,
922 sub->
origin, subrel_local_oids,
923 subrel_count, sub->
name);
929 sub_remove_rels =
palloc(subrel_count *
sizeof(SubRemoveRels));
941 foreach(lc, pubrel_names)
952 pubrel_local_oids[off++] = relid;
954 if (!bsearch(&relid, subrel_local_oids,
958 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
974 for (off = 0; off < subrel_count; off++)
976 Oid relid = subrel_local_oids[off];
978 if (!bsearch(&relid, pubrel_local_oids,
1004 sub_remove_rels[remove_rel_len].relid = relid;
1005 sub_remove_rels[remove_rel_len++].state =
state;
1015 if (
state != SUBREL_STATE_READY)
1030 sizeof(originname));
1047 for (off = 0; off < remove_rel_len; off++)
1049 if (sub_remove_rels[off].
state != SUBREL_STATE_READY &&
1050 sub_remove_rels[off].
state != SUBREL_STATE_SYNCDONE)
1065 syncslotname,
sizeof(syncslotname));
1089 bool nulls[Natts_pg_subscription];
1090 bool replaces[Natts_pg_subscription];
1094 bool update_tuple =
false;
1108 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1109 errmsg(
"subscription \"%s\" does not exist",
1128 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1129 errmsg(
"password_required=false is superuser-only"),
1130 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1137 memset(nulls,
false,
sizeof(nulls));
1138 memset(replaces,
false,
sizeof(replaces));
1152 supported_opts, &
opts);
1165 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1166 errmsg(
"cannot set %s for enabled subscription",
1167 "slot_name = NONE")));
1170 values[Anum_pg_subscription_subslotname - 1] =
1173 nulls[Anum_pg_subscription_subslotname - 1] =
true;
1174 replaces[Anum_pg_subscription_subslotname - 1] =
true;
1177 if (
opts.synchronous_commit)
1179 values[Anum_pg_subscription_subsynccommit - 1] =
1181 replaces[Anum_pg_subscription_subsynccommit - 1] =
true;
1186 values[Anum_pg_subscription_subbinary - 1] =
1188 replaces[Anum_pg_subscription_subbinary - 1] =
true;
1193 values[Anum_pg_subscription_substream - 1] =
1195 replaces[Anum_pg_subscription_substream - 1] =
true;
1200 values[Anum_pg_subscription_subdisableonerr - 1]
1202 replaces[Anum_pg_subscription_subdisableonerr - 1]
1211 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1212 errmsg(
"password_required=false is superuser-only"),
1213 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1215 values[Anum_pg_subscription_subpasswordrequired - 1]
1217 replaces[Anum_pg_subscription_subpasswordrequired - 1]
1223 values[Anum_pg_subscription_subrunasowner - 1] =
1225 replaces[Anum_pg_subscription_subrunasowner - 1] =
true;
1232 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1233 errmsg(
"cannot set %s for a subscription that does not have a slot name",
1244 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1245 errmsg(
"cannot set %s for enabled subscription",
1254 values[Anum_pg_subscription_subfailover - 1] =
1256 replaces[Anum_pg_subscription_subfailover - 1] =
true;
1261 values[Anum_pg_subscription_suborigin - 1] =
1263 replaces[Anum_pg_subscription_suborigin - 1] =
true;
1266 update_tuple =
true;
1278 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1279 errmsg(
"cannot enable subscription that does not have a slot name")));
1281 values[Anum_pg_subscription_subenabled - 1] =
1283 replaces[Anum_pg_subscription_subenabled - 1] =
true;
1288 update_tuple =
true;
1299 values[Anum_pg_subscription_subconninfo - 1] =
1301 replaces[Anum_pg_subscription_subconninfo - 1] =
true;
1302 update_tuple =
true;
1309 supported_opts, &
opts);
1311 values[Anum_pg_subscription_subpublications - 1] =
1313 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1315 update_tuple =
true;
1322 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1323 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1324 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1332 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1333 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1334 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1356 supported_opts, &
opts);
1359 values[Anum_pg_subscription_subpublications - 1] =
1361 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1363 update_tuple =
true;
1369 List *validate_publications = (isadd) ?
stmt->publication : NULL;
1373 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1374 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1378 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1379 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1387 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1388 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1390 errhint(
"Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1392 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1393 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1401 validate_publications);
1411 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1412 errmsg(
"ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1436 (
errcode(ERRCODE_SYNTAX_ERROR),
1437 errmsg(
"ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1438 errhint(
"Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1465 originname,
sizeof(originname));
1472 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1473 errmsg(
"skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1479 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
1481 update_tuple =
true;
1486 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
1508 if (replaces[Anum_pg_subscription_subfailover - 1])
1510 bool must_use_password;
1523 (
errcode(ERRCODE_CONNECTION_FAILURE),
1524 errmsg(
"could not connect to the publisher: %s",
err)));
1572 bool must_use_password;
1587 if (!
stmt->missing_ok)
1589 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1590 errmsg(
"subscription \"%s\" does not exist",
1594 (
errmsg(
"subscription \"%s\" does not exist, skipping",
1602 subowner = form->subowner;
1603 must_use_password = !
superuser_arg(subowner) && form->subpasswordrequired;
1621 Anum_pg_subscription_subname);
1626 Anum_pg_subscription_subconninfo);
1631 Anum_pg_subscription_subslotname, &isnull);
1678 foreach(lc, subworkers)
1706 foreach(lc, rstates)
1723 sizeof(originname));
1747 if (!slotname && rstates ==
NIL)
1785 foreach(lc, rstates)
1806 if (rstate->
state != SUBREL_STATE_SYNCDONE)
1811 sizeof(syncslotname));
1863 (
errmsg(
"dropped replication slot \"%s\" on publisher",
1868 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1872 (
errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1873 slotname,
res->err)));
1879 (
errcode(ERRCODE_CONNECTION_FAILURE),
1880 errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1881 slotname,
res->err)));
1904 if (form->subowner == newOwnerId)
1915 if (!form->subpasswordrequired && !
superuser())
1917 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1918 errmsg(
"password_required=false is superuser-only"),
1919 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1937 form->subowner = newOwnerId;
1972 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1973 errmsg(
"subscription \"%s\" does not exist",
name)));
2004 (
errcode(ERRCODE_UNDEFINED_OBJECT),
2005 errmsg(
"subscription with OID %u does not exist", subid)));
2030 bool copydata,
char *origin,
Oid *subrel_local_oids,
2031 int subrel_count,
char *
subname)
2036 Oid tableRow[1] = {TEXTOID};
2040 if (!copydata || !origin ||
2046 "SELECT DISTINCT P.pubname AS pubname\n"
2047 "FROM pg_publication P,\n"
2048 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2049 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
2050 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2051 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2060 for (
i = 0;
i < subrel_count;
i++)
2062 Oid relid = subrel_local_oids[
i];
2066 appendStringInfo(&cmd,
"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2067 schemaname, tablename);
2075 (
errcode(ERRCODE_CONNECTION_FAILURE),
2076 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2111 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2112 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2114 errdetail_plural(
"The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2115 "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2117 errhint(
"Verify that initial data copied from the publisher tables did not come from other origins."));
2152 tableRow[2] = INT2VECTOROID;
2170 " FROM pg_class c\n"
2171 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2172 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2173 " FROM pg_publication\n"
2174 " WHERE pubname IN ( %s )) AS gpt\n"
2175 " ON gpt.relid = c.oid\n",
2182 tableRow[2] = NAMEARRAYOID;
2186 if (check_columnlist)
2190 " WHERE t.pubname IN (");
2200 (
errcode(ERRCODE_CONNECTION_FAILURE),
2201 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2220 if (check_columnlist &&
list_member(tablelist, rv))
2222 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2223 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
2226 tablelist =
lappend(tablelist, rv);
2247 foreach(lc, rstates)
2260 if (rstate->
state != SUBREL_STATE_SYNCDONE)
2265 sizeof(syncslotname));
2266 elog(
WARNING,
"could not drop tablesync replication slot \"%s\"",
2272 (
errcode(ERRCODE_CONNECTION_FAILURE),
2273 errmsg(
"could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2276 errhint(
"Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2277 "ALTER SUBSCRIPTION ... DISABLE",
2278 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2292 foreach(cell, publist)
2297 foreach(pcell, publist)
2304 if (strcmp(
name, pname) == 0)
2307 errmsg(
"publication name \"%s\" used more than once",
2335 foreach(lc, newpublist)
2341 foreach(lc2, oldpublist)
2345 if (strcmp(
name, pubname) == 0)
2351 errmsg(
"publication \"%s\" is already in subscription \"%s\"",
2360 if (addpub && !found)
2362 else if (!addpub && !found)
2364 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2365 errmsg(
"publication \"%s\" is not in subscription \"%s\"",
2375 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2376 errmsg(
"cannot drop all the publications from a subscription")));
2432 (
errcode(ERRCODE_SYNTAX_ERROR),
2433 errmsg(
"%s requires a Boolean value or \"parallel\"",
bool has_privs_of_role(Oid member, Oid role)
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define TextDatumGetCString(d)
#define Assert(condition)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
char * get_database_name(Oid dbid)
static void PGresult * res
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void load_file(const char *filename, bool restricted)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define DirectFunctionCall1(func, arg1)
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
if(TABLE==NULL||TABLE_index==NULL)
List * logicalrep_workers_find(Oid subid, bool only_running)
void logicalrep_worker_stop(Oid subid, Oid relid)
void ApplyLauncherWakeupAtCommit(void)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
List * list_copy(const List *oldlist)
List * list_append_unique(List *list, void *datum)
List * list_delete(List *list, void *datum)
void list_free(List *list)
bool list_member(const List *list, const void *datum)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
char * get_namespace_name(Oid nspid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_rel_name(Oid relid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
Datum namein(PG_FUNCTION_ARGS)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
static AmcheckOptions opts
static int server_version
static int list_length(const List *l)
#define foreach_delete_current(lst, var_or_cell)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
Subscription * GetSubscription(Oid subid, bool missing_ok)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_STREAM_ON
#define LOGICALREP_ORIGIN_ANY
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
static Datum PointerGetDatum(const void *X)
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
Datum quote_literal(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(old_ctx)
#define RelationGetDescr(relation)
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, int elevel)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
char * synchronous_commit
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
static void check_duplicates_in_publist(List *publist, Datum *datums)
static Datum publicationListToArray(List *publist)
static void get_publications_str(List *publications, StringInfo dest, bool quote_literal)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
String * makeString(char *str)
static WalReceiverConn * wrconn
#define walrcv_alter_slot(conn, slotname, failover)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr