60 bool *enabled_given,
bool *enabled,
62 bool *slot_name_given,
char **slot_name,
66 bool *binary_given,
bool *binary,
67 bool *streaming_given,
bool *streaming)
70 bool connect_given =
false;
71 bool create_slot_given =
false;
72 bool copy_data_given =
false;
73 bool refresh_given =
false;
76 Assert(!connect || (enabled && create_slot && copy_data));
82 *enabled_given =
false;
89 *slot_name_given =
false;
94 if (synchronous_commit)
95 *synchronous_commit = NULL;
100 *binary_given =
false;
105 *streaming_given =
false;
118 (
errcode(ERRCODE_SYNTAX_ERROR),
119 errmsg(
"conflicting or redundant options")));
121 connect_given =
true;
124 else if (strcmp(defel->
defname,
"enabled") == 0 && enabled)
128 (
errcode(ERRCODE_SYNTAX_ERROR),
129 errmsg(
"conflicting or redundant options")));
131 *enabled_given =
true;
134 else if (strcmp(defel->
defname,
"create_slot") == 0 && create_slot)
136 if (create_slot_given)
138 (
errcode(ERRCODE_SYNTAX_ERROR),
139 errmsg(
"conflicting or redundant options")));
141 create_slot_given =
true;
144 else if (strcmp(defel->
defname,
"slot_name") == 0 && slot_name)
146 if (*slot_name_given)
148 (
errcode(ERRCODE_SYNTAX_ERROR),
149 errmsg(
"conflicting or redundant options")));
151 *slot_name_given =
true;
155 if (strcmp(*slot_name,
"none") == 0)
158 else if (strcmp(defel->
defname,
"copy_data") == 0 && copy_data)
162 (
errcode(ERRCODE_SYNTAX_ERROR),
163 errmsg(
"conflicting or redundant options")));
165 copy_data_given =
true;
168 else if (strcmp(defel->
defname,
"synchronous_commit") == 0 &&
171 if (*synchronous_commit)
173 (
errcode(ERRCODE_SYNTAX_ERROR),
174 errmsg(
"conflicting or redundant options")));
183 else if (strcmp(defel->
defname,
"refresh") == 0 && refresh)
187 (
errcode(ERRCODE_SYNTAX_ERROR),
188 errmsg(
"conflicting or redundant options")));
190 refresh_given =
true;
193 else if (strcmp(defel->
defname,
"binary") == 0 && binary)
197 (
errcode(ERRCODE_SYNTAX_ERROR),
198 errmsg(
"conflicting or redundant options")));
200 *binary_given =
true;
203 else if (strcmp(defel->
defname,
"streaming") == 0 && streaming)
205 if (*streaming_given)
207 (
errcode(ERRCODE_SYNTAX_ERROR),
208 errmsg(
"conflicting or redundant options")));
210 *streaming_given =
true;
215 (
errcode(ERRCODE_SYNTAX_ERROR),
216 errmsg(
"unrecognized subscription parameter: \"%s\"", defel->
defname)));
223 if (connect && !*connect)
226 if (enabled && *enabled_given && *enabled)
228 (
errcode(ERRCODE_SYNTAX_ERROR),
230 errmsg(
"%s and %s are mutually exclusive options",
231 "connect = false",
"enabled = true")));
233 if (create_slot && create_slot_given && *create_slot)
235 (
errcode(ERRCODE_SYNTAX_ERROR),
236 errmsg(
"%s and %s are mutually exclusive options",
237 "connect = false",
"create_slot = true")));
239 if (copy_data && copy_data_given && *copy_data)
241 (
errcode(ERRCODE_SYNTAX_ERROR),
242 errmsg(
"%s and %s are mutually exclusive options",
243 "connect = false",
"copy_data = true")));
247 *create_slot =
false;
255 if (slot_name && *slot_name_given && !*slot_name)
257 if (enabled && *enabled_given && *enabled)
259 (
errcode(ERRCODE_SYNTAX_ERROR),
261 errmsg(
"%s and %s are mutually exclusive options",
262 "slot_name = NONE",
"enabled = true")));
264 if (create_slot && create_slot_given && *create_slot)
266 (
errcode(ERRCODE_SYNTAX_ERROR),
267 errmsg(
"%s and %s are mutually exclusive options",
268 "slot_name = NONE",
"create_slot = true")));
270 if (enabled && !*enabled_given && *enabled)
272 (
errcode(ERRCODE_SYNTAX_ERROR),
274 errmsg(
"subscription with %s must also set %s",
275 "slot_name = NONE",
"enabled = false")));
277 if (create_slot && !create_slot_given && *create_slot)
279 (
errcode(ERRCODE_SYNTAX_ERROR),
280 errmsg(
"subscription with %s must also set %s",
281 "slot_name = NONE",
"create_slot = false")));
300 "publicationListToArray to array",
306 foreach(cell, publist)
312 foreach(pcell, publist)
319 if (strcmp(name, pname) == 0)
321 (
errcode(ERRCODE_SYNTAX_ERROR),
322 errmsg(
"publication name \"%s\" used more than once",
332 TEXTOID, -1,
false, TYPALIGN_INT);
348 bool nulls[Natts_pg_subscription];
357 bool streaming_given;
375 &enabled_given, &enabled,
377 &slotname_given, &slotname,
381 &binary_given, &binary,
382 &streaming_given, &streaming);
395 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
396 errmsg(
"must be superuser to create subscriptions")));
402 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS 403 if (strncmp(stmt->
subname,
"regress_", 8) != 0)
404 elog(
WARNING,
"subscriptions created by regression test cases should have names starting with \"regress_\"");
416 errmsg(
"subscription \"%s\" already exists",
420 if (!slotname_given && slotname == NULL)
424 if (synchronous_commit == NULL)
425 synchronous_commit =
"off";
437 memset(values, 0,
sizeof(values));
438 memset(nulls,
false,
sizeof(nulls));
441 Anum_pg_subscription_oid);
444 values[Anum_pg_subscription_subname - 1] =
447 values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(enabled);
448 values[Anum_pg_subscription_subbinary - 1] =
BoolGetDatum(binary);
449 values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(streaming);
450 values[Anum_pg_subscription_subconninfo - 1] =
453 values[Anum_pg_subscription_subslotname - 1] =
456 nulls[Anum_pg_subscription_subslotname - 1] =
true;
457 values[Anum_pg_subscription_subsynccommit - 1] =
459 values[Anum_pg_subscription_subpublications - 1] =
470 snprintf(originname,
sizeof(originname),
"pg_%u", subid);
489 (
errmsg(
"could not connect to the publisher: %s", err)));
497 table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
531 (
errmsg(
"created replication slot \"%s\" on publisher",
544 (
errmsg(
"tables were not subscribed, you will have to run %s to subscribe the tables",
545 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
565 Oid *subrel_local_oids;
566 Oid *pubrel_local_oids;
577 (
errmsg(
"could not connect to the publisher: %s", err)));
595 foreach(lc, subrel_states)
599 subrel_local_oids[off++] = relstate->
relid;
613 foreach(lc, pubrel_names)
624 pubrel_local_oids[off++] = relid;
626 if (!bsearch(&relid, subrel_local_oids,
630 copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
633 (
errmsg(
"table \"%s.%s\" added to subscription \"%s\"",
645 for (off = 0; off <
list_length(subrel_states); off++)
647 Oid relid = subrel_local_oids[off];
649 if (!bsearch(&relid, pubrel_local_oids,
657 (
errmsg(
"table \"%s.%s\" removed from subscription \"%s\"",
673 bool nulls[Natts_pg_subscription];
674 bool replaces[Natts_pg_subscription];
678 bool update_tuple =
false;
690 (
errcode(ERRCODE_UNDEFINED_OBJECT),
691 errmsg(
"subscription \"%s\" does not exist",
708 memset(values, 0,
sizeof(values));
709 memset(nulls,
false,
sizeof(nulls));
710 memset(replaces,
false,
sizeof(replaces));
721 bool streaming_given;
728 &slotname_given, &slotname,
732 &binary_given, &binary,
733 &streaming_given, &streaming);
739 (
errcode(ERRCODE_SYNTAX_ERROR),
740 errmsg(
"cannot set %s for enabled subscription",
741 "slot_name = NONE")));
744 values[Anum_pg_subscription_subslotname - 1] =
747 nulls[Anum_pg_subscription_subslotname - 1] =
true;
748 replaces[Anum_pg_subscription_subslotname - 1] =
true;
751 if (synchronous_commit)
753 values[Anum_pg_subscription_subsynccommit - 1] =
755 replaces[Anum_pg_subscription_subsynccommit - 1] =
true;
760 values[Anum_pg_subscription_subbinary - 1] =
762 replaces[Anum_pg_subscription_subbinary - 1] =
true;
767 values[Anum_pg_subscription_substream - 1] =
769 replaces[Anum_pg_subscription_substream - 1] =
true;
783 &enabled_given, &enabled,
795 (
errcode(ERRCODE_SYNTAX_ERROR),
796 errmsg(
"cannot enable subscription that does not have a slot name")));
798 values[Anum_pg_subscription_subenabled - 1] =
800 replaces[Anum_pg_subscription_subenabled - 1] =
true;
815 values[Anum_pg_subscription_subconninfo - 1] =
817 replaces[Anum_pg_subscription_subconninfo - 1] =
true;
836 values[Anum_pg_subscription_subpublications - 1] =
838 replaces[Anum_pg_subscription_subpublications - 1] =
true;
847 (
errcode(ERRCODE_SYNTAX_ERROR),
848 errmsg(
"ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
849 errhint(
"Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
866 (
errcode(ERRCODE_SYNTAX_ERROR),
867 errmsg(
"ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
886 elog(
ERROR,
"unrecognized ALTER SUBSCRIPTION kind %d",
949 (
errcode(ERRCODE_UNDEFINED_OBJECT),
950 errmsg(
"subscription \"%s\" does not exist",
954 (
errmsg(
"subscription \"%s\" does not exist, skipping",
979 Anum_pg_subscription_subname, &isnull);
985 Anum_pg_subscription_subconninfo, &isnull);
991 Anum_pg_subscription_subslotname, &isnull);
1037 foreach(lc, subworkers)
1052 snprintf(originname,
sizeof(originname),
"pg_%u", subid);
1079 (
errmsg(
"could not connect to publisher when attempting to " 1080 "drop the replication slot \"%s\"", slotname),
1083 errhint(
"Use %s to disassociate the subscription from the slot.",
1084 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
1094 (
errmsg(
"could not drop the replication slot \"%s\" on publisher",
1099 (
errmsg(
"dropped replication slot \"%s\" on publisher",
1125 if (form->subowner == newOwnerId)
1135 (
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1136 errmsg(
"permission denied to change owner of subscription \"%s\"",
1138 errhint(
"The owner of a subscription must be a superuser.")));
1140 form->subowner = newOwnerId;
1171 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1172 errmsg(
"subscription \"%s\" does not exist", name)));
1203 (
errcode(ERRCODE_UNDEFINED_OBJECT),
1204 errmsg(
"subscription with OID %u does not exist", subid)));
1223 Oid tableRow[2] = {TEXTOID, TEXTOID};
1232 " FROM pg_catalog.pg_publication_tables t\n" 1233 " WHERE t.pubname IN (");
1235 foreach(lc, publications)
1253 (
errmsg(
"could not receive list of replicated tables from the publisher: %s",
1271 tablelist =
lappend(tablelist, rv);
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
#define InvalidXLogRecPtr
List * logicalrep_workers_find(Oid subid, bool only_running)
void MemoryContextDelete(MemoryContext context)
#define AllocSetContextCreate
Datum namein(PG_FUNCTION_ARGS)
void table_close(Relation relation, LOCKMODE lockmode)
int errhint(const char *fmt,...)
char * quote_literal_cstr(const char *rawstr)
#define InvokeObjectPostCreateHook(classId, objectId, subId)
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
#define RelationGetDescr(relation)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define PointerGetDatum(X)
#define RangeVarGetRelid(relation, lockmode, missing_ok)
#define walrcv_check_conninfo(conninfo)
char get_rel_relkind(Oid relid)
char * pstrdup(const char *in)
Oid get_rel_namespace(Oid relid)
void replorigin_drop(RepOriginId roident, bool nowait)
#define InvokeObjectDropHook(classId, objectId, subId)
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
int errcode(int sqlerrcode)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
#define connect(s, name, namelen)
#define DirectFunctionCall1(func, arg1)
FormData_pg_subscription * Form_pg_subscription
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void heap_freetuple(HeapTuple htup)
ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
#define OidIsValid(objectId)
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
void LWLockRelease(LWLock *lock)
Subscription * GetSubscription(Oid subid, bool missing_ok)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
bool defGetBoolean(DefElem *def)
static void walrcv_clear_result(WalRcvExecResult *walres)
void pfree(void *pointer)
void appendStringInfo(StringInfo str, const char *fmt,...)
#define ObjectIdGetDatum(X)
char * defGetString(DefElem *def)
#define ALLOCSET_DEFAULT_SIZES
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, bool *streaming_given, bool *streaming)
void appendStringInfoString(StringInfo str, const char *s)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
char * get_namespace_name(Oid nspid)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
AlterSubscriptionType kind
#define SubscriptionObjectIndexId
void logicalrep_worker_stop(Oid subid, Oid relid)
int errdetail(const char *fmt,...)
List * GetSubscriptionRelations(Oid subid)
#define CStringGetDatum(X)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
MemoryContext CurrentMemoryContext
RepOriginId replorigin_create(char *roname)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
bool superuser_arg(Oid roleid)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
List * lappend(List *list, void *datum)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
#define TextDatumGetCString(d)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
#define ereport(elevel,...)
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
#define HeapTupleIsValid(tuple)
Tuplestorestate * tuplestore
#define Assert(condition)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void load_file(const char *filename, bool restricted)
int oid_cmp(const void *p1, const void *p2)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
static int list_length(const List *l)
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
#define walrcv_disconnect(conn)
#define InvalidRepOriginId
#define ObjectAddressSet(addr, class_id, object_id)
static Datum values[MAXATTR]
#define SearchSysCacheCopy1(cacheId, key1)
#define AccessExclusiveLock
int errmsg(const char *fmt,...)
void list_free(List *list)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define CStringGetTextDatum(s)
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
#define qsort(a, b, c, d)
void ApplyLauncherWakeupAtCommit(void)
#define SearchSysCacheCopy2(cacheId, key1, key2)
Relation table_open(Oid relationId, LOCKMODE lockmode)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
#define ERRCODE_DUPLICATE_OBJECT
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
char * get_rel_name(Oid relid)
const TupleTableSlotOps TTSOpsMinimalTuple
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
#define walrcv_connect(conninfo, logical, appname, err)