29 #include "utils/fmgroids.h"
56 elog(
ERROR,
"cache lookup failed for subscription %u", subid);
63 sub->
dbid = subform->subdbid;
64 sub->
skiplsn = subform->subskiplsn;
66 sub->
owner = subform->subowner;
67 sub->
enabled = subform->subenabled;
68 sub->
binary = subform->subbinary;
69 sub->
stream = subform->substream;
74 sub->
failover = subform->subfailover;
79 Anum_pg_subscription_subconninfo);
85 Anum_pg_subscription_subslotname,
95 Anum_pg_subscription_subsynccommit);
101 Anum_pg_subscription_subpublications);
107 Anum_pg_subscription_suborigin);
134 Anum_pg_subscription_subdbid,
172 bool nulls[Natts_pg_subscription];
173 bool replaces[Natts_pg_subscription];
182 elog(
ERROR,
"cache lookup failed for subscription %u", subid);
188 memset(nulls,
false,
sizeof(nulls));
189 memset(replaces,
false,
sizeof(replaces));
193 replaces[Anum_pg_subscription_subenabled - 1] =
true;
222 for (
i = 0;
i < nelems;
i++)
241 bool nulls[Natts_pg_subscription_rel];
253 elog(
ERROR,
"subscription table %u in subscription %u already exists",
258 memset(nulls,
false,
sizeof(nulls));
265 nulls[Anum_pg_subscription_rel_srsublsn - 1] =
true;
295 bool nulls[Natts_pg_subscription_rel];
297 bool replaces[Natts_pg_subscription_rel];
308 elog(
ERROR,
"subscription table %u in subscription %u does not exist",
313 memset(nulls,
false,
sizeof(nulls));
314 memset(replaces,
false,
sizeof(replaces));
316 replaces[Anum_pg_subscription_rel_srsubstate - 1] =
true;
319 replaces[Anum_pg_subscription_rel_srsublsn - 1] =
true;
323 nulls[Anum_pg_subscription_rel_srsublsn - 1] =
true;
364 return SUBREL_STATE_UNKNOWN;
372 Anum_pg_subscription_rel_srsublsn, &isnull);
404 Anum_pg_subscription_rel_srsubid,
413 Anum_pg_subscription_rel_srrelid,
434 if (!
OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
437 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
438 errmsg(
"could not drop relation mapping for subscription \"%s\"",
440 errdetail(
"Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
447 errhint(
"Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
448 "ALTER SUBSCRIPTION ... ENABLE",
449 "DROP SUBSCRIPTION ...")));
476 Anum_pg_subscription_rel_srsubid,
513 Anum_pg_subscription_rel_srsubid,
519 Anum_pg_subscription_rel_srsubstate,
536 relstate->
relid = subrel->srrelid;
537 relstate->
state = subrel->srsubstate;
539 Anum_pg_subscription_rel_srsublsn, &isnull);
#define DatumGetArrayTypeP(X)
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
static void PGresult * res
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
char * get_rel_name(Oid relid)
char * get_subscription_name(Oid subid, bool missing_ok)
char * pstrdup(const char *in)
void pfree(void *pointer)
static Datum LSNGetDatum(XLogRecPtr X)
static XLogRecPtr DatumGetLSN(Datum X)
int CountDBSubscriptions(Oid dbid)
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
Subscription * GetSubscription(Oid subid, bool missing_ok)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
static List * textarray_to_stringlist(ArrayType *textarray)
bool HasSubscriptionRelations(Oid subid)
FormData_pg_subscription * Form_pg_subscription
FormData_pg_subscription_rel * Form_pg_subscription_rel
static Name DatumGetName(Datum X)
static Datum BoolGetDatum(bool X)
static Datum ObjectIdGetDatum(Oid X)
static Datum CharGetDatum(char X)
#define RelationGetDescr(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
#define BTEqualStrategyNumber
bool superuser_arg(Oid roleid)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
#define SearchSysCacheCopy1(cacheId, key1)
#define SearchSysCacheCopy2(cacheId, key1, key2)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
static void table_endscan(TableScanDesc scan)
String * makeString(char *str)
#define InvalidXLogRecPtr