31 #include "utils/fmgroids.h" 57 elog(
ERROR,
"cache lookup failed for subscription %u", subid);
64 sub->
dbid = subform->subdbid;
66 sub->
owner = subform->subowner;
67 sub->
enabled = subform->subenabled;
72 Anum_pg_subscription_subconninfo,
80 Anum_pg_subscription_subslotname,
90 Anum_pg_subscription_subsynccommit,
98 Anum_pg_subscription_subpublications,
124 Anum_pg_subscription_subdbid,
170 (
errcode(ERRCODE_UNDEFINED_OBJECT),
171 errmsg(
"subscription \"%s\" does not exist", subname)));
193 elog(
ERROR,
"cache lookup failed for subscription %u", subid);
219 TEXTOID, -1,
false,
'i',
220 &elems, NULL, &nelems);
225 for (i = 0; i < nelems; i++)
240 bool nulls[Natts_pg_subscription_rel];
252 elog(
ERROR,
"subscription table %u in subscription %u already exists",
256 memset(values, 0,
sizeof(values));
257 memset(nulls,
false,
sizeof(nulls));
260 values[Anum_pg_subscription_rel_srsubstate - 1] =
CharGetDatum(state);
262 values[Anum_pg_subscription_rel_srsublsn - 1] =
LSNGetDatum(sublsn);
264 nulls[Anum_pg_subscription_rel_srsublsn - 1] =
true;
286 bool nulls[Natts_pg_subscription_rel];
288 bool replaces[Natts_pg_subscription_rel];
299 elog(
ERROR,
"subscription table %u in subscription %u does not exist",
303 memset(values, 0,
sizeof(values));
304 memset(nulls,
false,
sizeof(nulls));
305 memset(replaces,
false,
sizeof(replaces));
307 replaces[Anum_pg_subscription_rel_srsubstate - 1] =
true;
308 values[Anum_pg_subscription_rel_srsubstate - 1] =
CharGetDatum(state);
310 replaces[Anum_pg_subscription_rel_srsublsn - 1] =
true;
312 values[Anum_pg_subscription_rel_srsublsn - 1] =
LSNGetDatum(sublsn);
314 nulls[Anum_pg_subscription_rel_srsublsn - 1] =
true;
354 return SUBREL_STATE_UNKNOWN;
357 elog(
ERROR,
"subscription table %u in subscription %u does not exist",
363 Anum_pg_subscription_rel_srsubstate, &isnull);
367 Anum_pg_subscription_rel_srsublsn, &isnull);
398 Anum_pg_subscription_rel_srsubid,
407 Anum_pg_subscription_rel_srrelid,
443 Anum_pg_subscription_rel_srsubid,
458 relstate->
relid = subrel->srrelid;
459 relstate->
state = subrel->srsubstate;
460 relstate->
lsn = subrel->srsublsn;
490 Anum_pg_subscription_rel_srsubid,
495 Anum_pg_subscription_rel_srsubstate,
510 relstate->
relid = subrel->srrelid;
511 relstate->
state = subrel->srsubstate;
512 relstate->
lsn = subrel->srsublsn;
Value * makeString(char *str)
#define InvalidXLogRecPtr
void table_close(Relation relation, LOCKMODE lockmode)
void systable_endscan(SysScanDesc sysscan)
void RemoveSubscriptionRel(Oid subid, Oid relid)
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
#define RelationGetDescr(relation)
char * pstrdup(const char *in)
int CountDBSubscriptions(Oid dbid)
int errcode(int sqlerrcode)
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
#define OidIsValid(objectId)
void list_free_deep(List *list)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
char * get_subscription_name(Oid subid, bool missing_ok)
Subscription * GetSubscription(Oid subid, bool missing_ok)
HeapTuple systable_getnext(SysScanDesc sysscan)
void pfree(void *pointer)
#define ObjectIdGetDatum(X)
static List * textarray_to_stringlist(ArrayType *textarray)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
List * GetSubscriptionRelations(Oid subid)
#define CStringGetDatum(X)
#define ereport(elevel, rest)
List * lappend(List *list, void *datum)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
#define TextDatumGetCString(d)
void ReleaseSysCache(HeapTuple tuple)
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Oid get_subscription_oid(const char *subname, bool missing_ok)
#define HeapTupleIsValid(tuple)
FormData_pg_subscription_rel * Form_pg_subscription_rel
#define Assert(condition)
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok)
void FreeSubscription(Subscription *sub)
void deconstruct_array(ArrayType *array, Oid elmtype, int elmlen, bool elmbyval, char elmalign, Datum **elemsp, bool **nullsp, int *nelemsp)
static void table_endscan(TableScanDesc scan)
static Datum values[MAXATTR]
int errmsg(const char *fmt,...)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
#define SearchSysCacheCopy2(cacheId, key1, key2)
Relation table_open(Oid relationId, LOCKMODE lockmode)
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
List * GetSubscriptionNotReadyRelations(Oid subid)
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
#define BTEqualStrategyNumber
#define DatumGetArrayTypeP(X)