26 #include "catalog/pg_authid_d.h"
27 #include "catalog/pg_database_d.h"
59 #define SUBOPT_CONNECT 0x00000001
60 #define SUBOPT_ENABLED 0x00000002
61 #define SUBOPT_CREATE_SLOT 0x00000004
62 #define SUBOPT_SLOT_NAME 0x00000008
63 #define SUBOPT_COPY_DATA 0x00000010
64 #define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
65 #define SUBOPT_REFRESH 0x00000040
66 #define SUBOPT_BINARY 0x00000080
67 #define SUBOPT_STREAMING 0x00000100
68 #define SUBOPT_TWOPHASE_COMMIT 0x00000200
69 #define SUBOPT_DISABLE_ON_ERR 0x00000400
70 #define SUBOPT_PASSWORD_REQUIRED 0x00000800
71 #define SUBOPT_RUN_AS_OWNER 0x00001000
72 #define SUBOPT_LSN 0x00002000
73 #define SUBOPT_ORIGIN 0x00004000
76 #define IsSet(val, bits) (((val) & (bits)) == (bits))
104 List *publications,
bool copydata,
105 char *origin,
Oid *subrel_local_oids,
106 int subrel_count,
char *
subname);
128 Assert(supported_opts != 0);
137 opts->connect =
true;
139 opts->enabled =
true;
141 opts->create_slot =
true;
143 opts->copy_data =
true;
145 opts->refresh =
true;
147 opts->binary =
false;
151 opts->twophase =
false;
153 opts->disableonerr =
false;
155 opts->passwordrequired =
true;
157 opts->runasowner =
false;
162 foreach(lc, stmt_options)
167 strcmp(defel->
defname,
"connect") == 0)
176 strcmp(defel->
defname,
"enabled") == 0)
185 strcmp(defel->
defname,
"create_slot") == 0)
194 strcmp(defel->
defname,
"slot_name") == 0)
203 if (strcmp(
opts->slot_name,
"none") == 0)
204 opts->slot_name = NULL;
209 strcmp(defel->
defname,
"copy_data") == 0)
218 strcmp(defel->
defname,
"synchronous_commit") == 0)
232 strcmp(defel->
defname,
"refresh") == 0)
241 strcmp(defel->
defname,
"binary") == 0)
250 strcmp(defel->
defname,
"streaming") == 0)
258 else if (strcmp(defel->
defname,
"two_phase") == 0)
270 (
errcode(ERRCODE_SYNTAX_ERROR),
271 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
280 strcmp(defel->
defname,
"disable_on_error") == 0)
289 strcmp(defel->
defname,
"password_required") == 0)
298 strcmp(defel->
defname,
"run_as_owner") == 0)
307 strcmp(defel->
defname,
"origin") == 0)
326 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
327 errmsg(
"unrecognized origin value: \"%s\"",
opts->origin));
330 strcmp(defel->
defname,
"lsn") == 0)
339 if (strcmp(lsn_str,
"none") == 0)
349 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
350 errmsg(
"invalid WAL location (LSN): %s", lsn_str)));
358 (
errcode(ERRCODE_SYNTAX_ERROR),
359 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
372 (
errcode(ERRCODE_SYNTAX_ERROR),
374 errmsg(
"%s and %s are mutually exclusive options",
375 "connect = false",
"enabled = true")));
377 if (
opts->create_slot &&
380 (
errcode(ERRCODE_SYNTAX_ERROR),
381 errmsg(
"%s and %s are mutually exclusive options",
382 "connect = false",
"create_slot = true")));
384 if (
opts->copy_data &&
387 (
errcode(ERRCODE_SYNTAX_ERROR),
388 errmsg(
"%s and %s are mutually exclusive options",
389 "connect = false",
"copy_data = true")));
392 opts->enabled =
false;
393 opts->create_slot =
false;
394 opts->copy_data =
false;
401 if (!
opts->slot_name &&
408 (
errcode(ERRCODE_SYNTAX_ERROR),
410 errmsg(
"%s and %s are mutually exclusive options",
411 "slot_name = NONE",
"enabled = true")));
414 (
errcode(ERRCODE_SYNTAX_ERROR),
416 errmsg(
"subscription with %s must also set %s",
417 "slot_name = NONE",
"enabled = false")));
420 if (
opts->create_slot)
424 (
errcode(ERRCODE_SYNTAX_ERROR),
426 errmsg(
"%s and %s are mutually exclusive options",
427 "slot_name = NONE",
"create_slot = true")));
430 (
errcode(ERRCODE_SYNTAX_ERROR),
432 errmsg(
"subscription with %s must also set %s",
433 "slot_name = NONE",
"create_slot = false")));
449 foreach(lc, publications)
479 Oid tableRow[1] = {TEXTOID};
483 " pg_catalog.pg_publication t WHERE\n"
494 errmsg(
"could not receive list of publications from the publisher: %s",
497 publicationsCopy =
list_copy(publications);
525 errcode(ERRCODE_UNDEFINED_OBJECT),
526 errmsg_plural(
"publication %s does not exist on the publisher",
527 "publications %s do not exist on the publisher",
546 "publicationListToArray to array",
573 bool nulls[Natts_pg_subscription];
603 if (
opts.create_slot)
613 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
614 errmsg(
"permission denied to create subscription"),
615 errdetail(
"Only roles with privileges of the \"%s\" role may create subscriptions.",
616 "pg_create_subscription")));
635 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
636 errmsg(
"password_required=false is superuser-only"),
637 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
643 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
644 if (strncmp(
stmt->subname,
"regress_", 8) != 0)
645 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
657 errmsg(
"subscription \"%s\" already exists",
662 opts.slot_name == NULL)
666 if (
opts.synchronous_commit == NULL)
667 opts.synchronous_commit =
"off";
669 conninfo =
stmt->conninfo;
670 publications =
stmt->publication;
680 memset(nulls,
false,
sizeof(nulls));
683 Anum_pg_subscription_oid);
687 values[Anum_pg_subscription_subname - 1] =
693 values[Anum_pg_subscription_subtwophasestate - 1] =
700 values[Anum_pg_subscription_subconninfo - 1] =
703 values[Anum_pg_subscription_subslotname - 1] =
706 nulls[Anum_pg_subscription_subslotname - 1] =
true;
707 values[Anum_pg_subscription_subsynccommit - 1] =
709 values[Anum_pg_subscription_subpublications - 1] =
711 values[Anum_pg_subscription_suborigin - 1] =
736 bool must_use_password;
744 (
errcode(ERRCODE_CONNECTION_FAILURE),
745 errmsg(
"could not connect to the publisher: %s",
err)));
751 opts.origin, NULL, 0,
stmt->subname);
757 table_state =
opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
784 if (
opts.create_slot)
786 bool twophase_enabled =
false;
806 if (
opts.twophase && !
opts.copy_data && tables !=
NIL)
807 twophase_enabled =
true;
812 if (twophase_enabled)
816 (
errmsg(
"created replication slot \"%s\" on publisher",
828 (
errmsg(
"subscription was created, but is not connected"),
829 errhint(
"To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.")));
847 List *validate_publications)
852 Oid *subrel_local_oids;
853 Oid *pubrel_local_oids;
859 typedef struct SubRemoveRels
864 SubRemoveRels *sub_remove_rels;
866 bool must_use_password;
877 (
errcode(ERRCODE_CONNECTION_FAILURE),
878 errmsg(
"could not connect to the publisher: %s",
err)));
882 if (validate_publications)
897 subrel_local_oids =
palloc(subrel_count *
sizeof(
Oid));
899 foreach(lc, subrel_states)
903 subrel_local_oids[off++] = relstate->
relid;
905 qsort(subrel_local_oids, subrel_count,
909 sub->
origin, subrel_local_oids,
910 subrel_count, sub->
name);
916 sub_remove_rels =
palloc(subrel_count *
sizeof(SubRemoveRels));
928 foreach(lc, pubrel_names)
939 pubrel_local_oids[off++] = relid;
941 if (!bsearch(&relid, subrel_local_oids,
945 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
961 for (off = 0; off < subrel_count; off++)
963 Oid relid = subrel_local_oids[off];
965 if (!bsearch(&relid, pubrel_local_oids,
991 sub_remove_rels[remove_rel_len].relid = relid;
992 sub_remove_rels[remove_rel_len++].state =
state;
1002 if (
state != SUBREL_STATE_READY)
1017 sizeof(originname));
1034 for (off = 0; off < remove_rel_len; off++)
1036 if (sub_remove_rels[off].
state != SUBREL_STATE_READY &&
1037 sub_remove_rels[off].
state != SUBREL_STATE_SYNCDONE)
1052 syncslotname,
sizeof(syncslotname));
1076 bool nulls[Natts_pg_subscription];
1077 bool replaces[Natts_pg_subscription];
1081 bool update_tuple =
false;
1095 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1096 errmsg(
"subscription \"%s\" does not exist",
1115 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1116 errmsg(
"password_required=false is superuser-only"),
1117 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1124 memset(nulls,
false,
sizeof(nulls));
1125 memset(replaces,
false,
sizeof(replaces));
1138 supported_opts, &
opts);
1151 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1152 errmsg(
"cannot set %s for enabled subscription",
1153 "slot_name = NONE")));
1156 values[Anum_pg_subscription_subslotname - 1] =
1159 nulls[Anum_pg_subscription_subslotname - 1] =
true;
1160 replaces[Anum_pg_subscription_subslotname - 1] =
true;
1163 if (
opts.synchronous_commit)
1165 values[Anum_pg_subscription_subsynccommit - 1] =
1167 replaces[Anum_pg_subscription_subsynccommit - 1] =
true;
1172 values[Anum_pg_subscription_subbinary - 1] =
1174 replaces[Anum_pg_subscription_subbinary - 1] =
true;
1179 values[Anum_pg_subscription_substream - 1] =
1181 replaces[Anum_pg_subscription_substream - 1] =
true;
1186 values[Anum_pg_subscription_subdisableonerr - 1]
1188 replaces[Anum_pg_subscription_subdisableonerr - 1]
1197 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1198 errmsg(
"password_required=false is superuser-only"),
1199 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1201 values[Anum_pg_subscription_subpasswordrequired - 1]
1203 replaces[Anum_pg_subscription_subpasswordrequired - 1]
1209 values[Anum_pg_subscription_subrunasowner - 1] =
1211 replaces[Anum_pg_subscription_subrunasowner - 1] =
true;
1216 values[Anum_pg_subscription_suborigin - 1] =
1218 replaces[Anum_pg_subscription_suborigin - 1] =
true;
1221 update_tuple =
true;
1233 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1234 errmsg(
"cannot enable subscription that does not have a slot name")));
1236 values[Anum_pg_subscription_subenabled - 1] =
1238 replaces[Anum_pg_subscription_subenabled - 1] =
true;
1243 update_tuple =
true;
1254 values[Anum_pg_subscription_subconninfo - 1] =
1256 replaces[Anum_pg_subscription_subconninfo - 1] =
true;
1257 update_tuple =
true;
1264 supported_opts, &
opts);
1266 values[Anum_pg_subscription_subpublications - 1] =
1268 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1270 update_tuple =
true;
1277 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1278 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1279 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1287 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1288 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1289 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1311 supported_opts, &
opts);
1314 values[Anum_pg_subscription_subpublications - 1] =
1316 replaces[Anum_pg_subscription_subpublications - 1] =
true;
1318 update_tuple =
true;
1324 List *validate_publications = (isadd) ?
stmt->publication : NULL;
1328 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1329 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1333 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1334 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1342 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1343 errmsg(
"ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1345 errhint(
"Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1347 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1348 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1356 validate_publications);
1366 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1367 errmsg(
"ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
1391 (
errcode(ERRCODE_SYNTAX_ERROR),
1392 errmsg(
"ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
1393 errhint(
"Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1420 originname,
sizeof(originname));
1427 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1428 errmsg(
"skip WAL location (LSN %X/%X) must be greater than origin LSN %X/%X",
1434 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
1436 update_tuple =
true;
1441 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
1491 bool must_use_password;
1506 if (!
stmt->missing_ok)
1508 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1509 errmsg(
"subscription \"%s\" does not exist",
1513 (
errmsg(
"subscription \"%s\" does not exist, skipping",
1521 subowner = form->subowner;
1522 must_use_password = !
superuser_arg(subowner) && form->subpasswordrequired;
1540 Anum_pg_subscription_subname);
1545 Anum_pg_subscription_subconninfo);
1550 Anum_pg_subscription_subslotname, &isnull);
1597 foreach(lc, subworkers)
1625 foreach(lc, rstates)
1642 sizeof(originname));
1666 if (!slotname && rstates ==
NIL)
1704 foreach(lc, rstates)
1725 if (rstate->
state != SUBREL_STATE_SYNCDONE)
1730 sizeof(syncslotname));
1782 (
errmsg(
"dropped replication slot \"%s\" on publisher",
1787 res->sqlstate == ERRCODE_UNDEFINED_OBJECT)
1791 (
errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1792 slotname,
res->err)));
1798 (
errcode(ERRCODE_CONNECTION_FAILURE),
1799 errmsg(
"could not drop replication slot \"%s\" on publisher: %s",
1800 slotname,
res->err)));
1823 if (form->subowner == newOwnerId)
1834 if (!form->subpasswordrequired && !
superuser())
1836 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1837 errmsg(
"password_required=false is superuser-only"),
1838 errhint(
"Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1856 form->subowner = newOwnerId;
1891 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1892 errmsg(
"subscription \"%s\" does not exist",
name)));
1923 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1924 errmsg(
"subscription with OID %u does not exist", subid)));
1949 bool copydata,
char *origin,
Oid *subrel_local_oids,
1950 int subrel_count,
char *
subname)
1955 Oid tableRow[1] = {TEXTOID};
1959 if (!copydata || !origin ||
1965 "SELECT DISTINCT P.pubname AS pubname\n"
1966 "FROM pg_publication P,\n"
1967 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1968 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1969 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1970 "WHERE C.oid = GPT.relid AND P.pubname IN (");
1979 for (
i = 0;
i < subrel_count;
i++)
1981 Oid relid = subrel_local_oids[
i];
1985 appendStringInfo(&cmd,
"AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
1986 schemaname, tablename);
1994 (
errcode(ERRCODE_CONNECTION_FAILURE),
1995 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2030 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2031 errmsg(
"subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2033 errdetail_plural(
"The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2034 "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2036 errhint(
"Verify that initial data copied from the publisher tables did not come from other origins."));
2071 tableRow[2] = INT2VECTOROID;
2089 " FROM pg_class c\n"
2090 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2091 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2092 " FROM pg_publication\n"
2093 " WHERE pubname IN ( %s )) AS gpt\n"
2094 " ON gpt.relid = c.oid\n",
2101 tableRow[2] = NAMEARRAYOID;
2105 if (check_columnlist)
2109 " WHERE t.pubname IN (");
2119 (
errcode(ERRCODE_CONNECTION_FAILURE),
2120 errmsg(
"could not receive list of replicated tables from the publisher: %s",
2139 if (check_columnlist &&
list_member(tablelist, rv))
2141 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2142 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
2145 tablelist =
lappend(tablelist, rv);
2166 foreach(lc, rstates)
2179 if (rstate->
state != SUBREL_STATE_SYNCDONE)
2184 sizeof(syncslotname));
2185 elog(
WARNING,
"could not drop tablesync replication slot \"%s\"",
2191 (
errcode(ERRCODE_CONNECTION_FAILURE),
2192 errmsg(
"could not connect to publisher when attempting to drop replication slot \"%s\": %s",
2195 errhint(
"Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
2196 "ALTER SUBSCRIPTION ... DISABLE",
2197 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
2211 foreach(cell, publist)
2216 foreach(pcell, publist)
2223 if (strcmp(
name, pname) == 0)
2226 errmsg(
"publication name \"%s\" used more than once",
2254 foreach(lc, newpublist)
2260 foreach(lc2, oldpublist)
2264 if (strcmp(
name, pubname) == 0)
2270 errmsg(
"publication \"%s\" is already in subscription \"%s\"",
2279 if (addpub && !found)
2281 else if (!addpub && !found)
2283 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2284 errmsg(
"publication \"%s\" is not in subscription \"%s\"",
2294 (
errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
2295 errmsg(
"cannot drop all the publications from a subscription")));
2351 (
errcode(ERRCODE_SYNTAX_ERROR),
2352 errmsg(
"%s requires a Boolean value or \"parallel\"",
bool has_privs_of_role(Oid member, Oid role)
void check_can_set_role(Oid member, Oid role)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
static Datum values[MAXATTR]
#define CStringGetTextDatum(s)
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
char * get_database_name(Oid dbid)
static void PGresult * res
elog(ERROR, "%s: %s", p2, msg)
bool defGetBoolean(DefElem *def)
char * defGetString(DefElem *def)
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
void load_file(const char *filename, bool restricted)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errmsg_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define DirectFunctionCall1(func, arg1)
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
if(TABLE==NULL||TABLE_index==NULL)
List * logicalrep_workers_find(Oid subid, bool only_running)
void logicalrep_worker_stop(Oid subid, Oid relid)
void ApplyLauncherWakeupAtCommit(void)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Assert(fmt[strlen(fmt) - 1] !='\n')
List * lappend(List *list, void *datum)
List * list_copy(const List *oldlist)
List * list_append_unique(List *list, void *datum)
List * list_delete(List *list, void *datum)
void list_free(List *list)
bool list_member(const List *list, const void *datum)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
#define AccessExclusiveLock
char * get_namespace_name(Oid nspid)
char get_rel_relkind(Oid relid)
Oid get_rel_namespace(Oid relid)
char * get_rel_name(Oid relid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext CurrentMemoryContext
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
Datum namein(PG_FUNCTION_ARGS)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
static AmcheckOptions opts
static int server_version
static int list_length(const List *l)
#define foreach_delete_current(lst, cell)
Datum pg_lsn_in(PG_FUNCTION_ARGS)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
Subscription * GetSubscription(Oid subid, bool missing_ok)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define LOGICALREP_ORIGIN_NONE
#define LOGICALREP_STREAM_ON
#define LOGICALREP_ORIGIN_ANY
#define LOGICALREP_STREAM_OFF
#define LOGICALREP_STREAM_PARALLEL
#define LOGICALREP_TWOPHASE_STATE_DISABLED
#define LOGICALREP_TWOPHASE_STATE_PENDING
#define LOGICALREP_TWOPHASE_STATE_ENABLED
FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
static Datum PointerGetDatum(const void *X)
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CStringGetDatum(const char *X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
Datum quote_literal(PG_FUNCTION_ARGS)
#define RelationGetDescr(relation)
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, int elevel)
#define ERRCODE_DUPLICATE_OBJECT
StringInfo makeStringInfo(void)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
char * synchronous_commit
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
static void check_duplicates_in_publist(List *publist, Datum *datums)
static Datum publicationListToArray(List *publist)
static void get_publications_str(List *publications, StringInfo dest, bool quote_literal)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
String * makeString(char *str)
static WalReceiverConn * wrconn
#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_connect(conninfo, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
#define LSN_FORMAT_ARGS(lsn)
#define XLogRecPtrIsInvalid(r)
#define InvalidXLogRecPtr