PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
pg_subscription.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_subscription.c
4  * replication subscriptions
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/catalog/pg_subscription.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
18 
19 #include "access/genam.h"
20 #include "access/heapam.h"
21 #include "access/htup_details.h"
22 #include "access/xact.h"
23 
24 #include "catalog/indexing.h"
25 #include "catalog/pg_type.h"
28 
29 #include "nodes/makefuncs.h"
30 
31 #include "utils/array.h"
32 #include "utils/builtins.h"
33 #include "utils/fmgroids.h"
34 #include "utils/pg_lsn.h"
35 #include "utils/rel.h"
36 #include "utils/syscache.h"
37 
38 
39 static List *textarray_to_stringlist(ArrayType *textarray);
40 
41 /*
42  * Fetch the subscription from the syscache.
43  */
45 GetSubscription(Oid subid, bool missing_ok)
46 {
47  HeapTuple tup;
48  Subscription *sub;
49  Form_pg_subscription subform;
50  Datum datum;
51  bool isnull;
52 
54 
55  if (!HeapTupleIsValid(tup))
56  {
57  if (missing_ok)
58  return NULL;
59 
60  elog(ERROR, "cache lookup failed for subscription %u", subid);
61  }
62 
63  subform = (Form_pg_subscription) GETSTRUCT(tup);
64 
65  sub = (Subscription *) palloc(sizeof(Subscription));
66  sub->oid = subid;
67  sub->dbid = subform->subdbid;
68  sub->name = pstrdup(NameStr(subform->subname));
69  sub->owner = subform->subowner;
70  sub->enabled = subform->subenabled;
71 
72  /* Get conninfo */
74  tup,
76  &isnull);
77  Assert(!isnull);
78  sub->conninfo = pstrdup(TextDatumGetCString(datum));
79 
80  /* Get slotname */
82  tup,
84  &isnull);
85  Assert(!isnull);
86  sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
87 
88  /* Get publications */
90  tup,
92  &isnull);
93  Assert(!isnull);
95 
96  ReleaseSysCache(tup);
97 
98  return sub;
99 }
100 
101 /*
102  * Return number of subscriptions defined in given database.
103  * Used by dropdb() to check if database can indeed be dropped.
104  */
105 int
107 {
108  int nsubs = 0;
109  Relation rel;
110  ScanKeyData scankey;
111  SysScanDesc scan;
112  HeapTuple tup;
113 
115 
116  ScanKeyInit(&scankey,
118  BTEqualStrategyNumber, F_OIDEQ,
119  ObjectIdGetDatum(dbid));
120 
121  scan = systable_beginscan(rel, InvalidOid, false,
122  NULL, 1, &scankey);
123 
124  while (HeapTupleIsValid(tup = systable_getnext(scan)))
125  nsubs++;
126 
127  systable_endscan(scan);
128 
129  heap_close(rel, NoLock);
130 
131  return nsubs;
132 }
133 
134 /*
135  * Free memory allocated by subscription struct.
136  */
137 void
139 {
140  pfree(sub->name);
141  pfree(sub->conninfo);
142  pfree(sub->slotname);
144  pfree(sub);
145 }
146 
147 /*
148  * get_subscription_oid - given a subscription name, look up the OID
149  *
150  * If missing_ok is false, throw an error if name not found. If true, just
151  * return InvalidOid.
152  */
153 Oid
154 get_subscription_oid(const char *subname, bool missing_ok)
155 {
156  Oid oid;
157 
159  CStringGetDatum(subname));
160  if (!OidIsValid(oid) && !missing_ok)
161  ereport(ERROR,
162  (errcode(ERRCODE_UNDEFINED_OBJECT),
163  errmsg("subscription \"%s\" does not exist", subname)));
164  return oid;
165 }
166 
167 /*
168  * get_subscription_name - given a subscription OID, look up the name
169  */
170 char *
172 {
173  HeapTuple tup;
174  char *subname;
175  Form_pg_subscription subform;
176 
178 
179  if (!HeapTupleIsValid(tup))
180  elog(ERROR, "cache lookup failed for subscription %u", subid);
181 
182  subform = (Form_pg_subscription) GETSTRUCT(tup);
183  subname = pstrdup(NameStr(subform->subname));
184 
185  ReleaseSysCache(tup);
186 
187  return subname;
188 }
189 
190 /*
191  * Convert text array to list of strings.
192  *
193  * Note: the resulting list of strings is pallocated here.
194  */
195 static List *
197 {
198  Datum *elems;
199  int nelems, i;
200  List *res = NIL;
201 
202  deconstruct_array(textarray,
203  TEXTOID, -1, false, 'i',
204  &elems, NULL, &nelems);
205 
206  if (nelems == 0)
207  return NIL;
208 
209  for (i = 0; i < nelems; i++)
210  res = lappend(res, makeString(pstrdup(TextDatumGetCString(elems[i]))));
211 
212  return res;
213 }
214 
215 /*
216  * Set the state of a subscription table.
217  */
218 Oid
220  XLogRecPtr sublsn)
221 {
222  Relation rel;
223  HeapTuple tup;
224  Oid subrelid;
225  bool nulls[Natts_pg_subscription_rel];
227 
228  /* Prevent concurrent changes. */
230 
231  /* Try finding existing mapping. */
233  ObjectIdGetDatum(relid),
234  ObjectIdGetDatum(subid));
235 
236  /*
237  * If the record for given table does not exist yet create new
238  * record, otherwise update the existing one.
239  */
240  if (!HeapTupleIsValid(tup))
241  {
242  /* Form the tuple. */
243  memset(values, 0, sizeof(values));
244  memset(nulls, false, sizeof(nulls));
248  if (sublsn != InvalidXLogRecPtr)
249  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
250  else
251  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
252 
253  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
254 
255  /* Insert tuple into catalog. */
256  subrelid = CatalogTupleInsert(rel, tup);
257 
258  heap_freetuple(tup);
259  }
260  else
261  {
262  bool replaces[Natts_pg_subscription_rel];
263 
264  /* Update the tuple. */
265  memset(values, 0, sizeof(values));
266  memset(nulls, false, sizeof(nulls));
267  memset(replaces, false, sizeof(replaces));
268 
269  replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
271 
272  replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
273  if (sublsn != InvalidXLogRecPtr)
274  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
275  else
276  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
277 
278  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
279  replaces);
280 
281  /* Update the catalog. */
282  CatalogTupleUpdate(rel, &tup->t_self, tup);
283 
284  subrelid = HeapTupleGetOid(tup);
285  }
286 
287  /* Cleanup. */
288  heap_close(rel, NoLock);
289 
290  return subrelid;
291 }
292 
293 /*
294  * Get state of subscription table.
295  *
296  * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true.
297  */
298 char
300  bool missing_ok)
301 {
302  Relation rel;
303  HeapTuple tup;
304  char substate;
305  bool isnull;
306  Datum d;
307 
309 
310  /* Try finding the mapping. */
312  ObjectIdGetDatum(relid),
313  ObjectIdGetDatum(subid));
314 
315  if (!HeapTupleIsValid(tup))
316  {
317  if (missing_ok)
318  {
320  *sublsn = InvalidXLogRecPtr;
321  return SUBREL_STATE_UNKNOWN;
322  }
323 
324  elog(ERROR, "subscription table %u in subscription %u does not exist",
325  relid, subid);
326  }
327 
328  /* Get the state. */
331  Assert(!isnull);
332  substate = DatumGetChar(d);
335  if (isnull)
336  *sublsn = InvalidXLogRecPtr;
337  else
338  *sublsn = DatumGetLSN(d);
339 
340  /* Cleanup */
341  ReleaseSysCache(tup);
343 
344  return substate;
345 }
346 
347 /*
348  * Drop subscription relation mapping. These can be for a particular
349  * subscription, or for a particular relation, or both.
350  */
351 void
353 {
354  Relation rel;
355  HeapScanDesc scan;
356  ScanKeyData skey[2];
357  HeapTuple tup;
358  int nkeys = 0;
359 
360  /* Prevent concurrent changes (see SetSubscriptionRelState()). */
362 
363  if (OidIsValid(subid))
364  {
365  ScanKeyInit(&skey[nkeys++],
368  F_OIDEQ,
369  ObjectIdGetDatum(subid));
370  }
371 
372  if (OidIsValid(relid))
373  {
374  ScanKeyInit(&skey[nkeys++],
377  F_OIDEQ,
378  ObjectIdGetDatum(relid));
379  }
380 
381  /* Do the search and delete what we found. */
382  scan = heap_beginscan_catalog(rel, nkeys, skey);
384  {
385  simple_heap_delete(rel, &tup->t_self);
386  }
387  heap_endscan(scan);
388 
390 }
391 
392 
393 /*
394  * Get all relations for subscription.
395  *
396  * Returned list is palloced in current memory context.
397  */
398 List *
400 {
401  List *res = NIL;
402  Relation rel;
403  HeapTuple tup;
404  int nkeys = 0;
405  ScanKeyData skey[2];
406  SysScanDesc scan;
407 
409 
410  ScanKeyInit(&skey[nkeys++],
412  BTEqualStrategyNumber, F_OIDEQ,
413  ObjectIdGetDatum(subid));
414 
415  scan = systable_beginscan(rel, InvalidOid, false,
416  NULL, nkeys, skey);
417 
418  while (HeapTupleIsValid(tup = systable_getnext(scan)))
419  {
421  SubscriptionRelState *relstate;
422 
423  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
424 
425  relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState));
426  relstate->relid = subrel->srrelid;
427  relstate->state = subrel->srsubstate;
428  relstate->lsn = subrel->srsublsn;
429 
430  res = lappend(res, relstate);
431  }
432 
433  /* Cleanup */
434  systable_endscan(scan);
436 
437  return res;
438 }
439 
440 /*
441  * Get all relations for subscription that are not in a ready state.
442  *
443  * Returned list is palloced in current memory context.
444  */
445 List *
447 {
448  List *res = NIL;
449  Relation rel;
450  HeapTuple tup;
451  int nkeys = 0;
452  ScanKeyData skey[2];
453  SysScanDesc scan;
454 
456 
457  ScanKeyInit(&skey[nkeys++],
459  BTEqualStrategyNumber, F_OIDEQ,
460  ObjectIdGetDatum(subid));
461 
462  ScanKeyInit(&skey[nkeys++],
464  BTEqualStrategyNumber, F_CHARNE,
466 
467  scan = systable_beginscan(rel, InvalidOid, false,
468  NULL, nkeys, skey);
469 
470  while (HeapTupleIsValid(tup = systable_getnext(scan)))
471  {
473  SubscriptionRelState *relstate;
474 
475  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
476 
477  relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState));
478  relstate->relid = subrel->srrelid;
479  relstate->state = subrel->srsubstate;
480  relstate->lsn = subrel->srsublsn;
481 
482  res = lappend(res, relstate);
483  }
484 
485  /* Cleanup */
486  systable_endscan(scan);
488 
489  return res;
490 }
Value * makeString(char *str)
Definition: value.c:53
#define NIL
Definition: pg_list.h:69
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:499
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
#define Anum_pg_subscription_rel_srsubid
#define Anum_pg_subscription_subpublications
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1578
void RemoveSubscriptionRel(Oid subid, Oid relid)
#define RelationGetDescr(relation)
Definition: rel.h:429
#define SubscriptionRelRelationId
#define TEXTOID
Definition: pg_type.h:324
char * pstrdup(const char *in)
Definition: mcxt.c:1077
int CountDBSubscriptions(Oid dbid)
#define AccessShareLock
Definition: lockdefs.h:36
int errcode(int sqlerrcode)
Definition: elog.c:575
#define LSNGetDatum(X)
Definition: pg_lsn.h:22
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
FormData_pg_subscription * Form_pg_subscription
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
#define OidIsValid(objectId)
Definition: c.h:538
void list_free_deep(List *list)
Definition: list.c:1147
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:328
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:152
#define GetSysCacheOid2(cacheId, key1, key2)
Definition: syscache.h:181
#define DatumGetName(X)
Definition: postgres.h:591
Subscription * GetSubscription(Oid subid, bool missing_ok)
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:416
void pfree(void *pointer)
Definition: mcxt.c:950
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
static List * textarray_to_stringlist(ArrayType *textarray)
ItemPointerData t_self
Definition: htup.h:65
#define SUBREL_STATE_UNKNOWN
#define SubscriptionRelationId
#define NoLock
Definition: lockdefs.h:34
#define Anum_pg_subscription_rel_srsublsn
#define RowExclusiveLock
Definition: lockdefs.h:38
List * GetSubscriptionRelations(Oid subid)
#define CStringGetDatum(X)
Definition: postgres.h:584
List * publications
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1399
#define ereport(elevel, rest)
Definition: elog.h:122
#define Natts_pg_subscription_rel
#define Anum_pg_subscription_subslotname
List * lappend(List *list, void *datum)
Definition: list.c:128
#define Anum_pg_subscription_subdbid
#define TextDatumGetCString(d)
Definition: builtins.h:92
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1116
#define DatumGetChar(X)
Definition: postgres.h:415
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1278
Oid MyDatabaseId
Definition: globals.c:76
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1794
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
#define InvalidOid
Definition: postgres_ext.h:36
#define DatumGetLSN(X)
Definition: pg_lsn.h:21
#define Anum_pg_subscription_subconninfo
Oid get_subscription_oid(const char *subname, bool missing_ok)
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
FormData_pg_subscription_rel * Form_pg_subscription_rel
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
Definition: regguts.h:298
#define ShareRowExclusiveLock
Definition: lockdefs.h:42
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
void simple_heap_delete(Relation relation, ItemPointer tid)
Definition: heapam.c:3398
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok)
#define CharGetDatum(X)
Definition: postgres.h:422
void FreeSubscription(Subscription *sub)
void deconstruct_array(ArrayType *array, Oid elmtype, int elmlen, bool elmbyval, char elmalign, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3475
static Datum values[MAXATTR]
Definition: bootstrap.c:162
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define NameStr(name)
Definition: c.h:499
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define Anum_pg_subscription_rel_srsubstate
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:163
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:793
Definition: pg_list.h:45
List * GetSubscriptionNotReadyRelations(Oid subid)
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define SUBREL_STATE_READY
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define Anum_pg_subscription_rel_srrelid
char * get_subscription_name(Oid subid)
#define DatumGetArrayTypeP(X)
Definition: array.h:242
#define SearchSysCache2(cacheId, key1, key2)
Definition: syscache.h:154