PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
pg_publication.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_publication.c
4  * publication C API manipulation
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * pg_publication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 
20 #include "access/genam.h"
21 #include "access/hash.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/xact.h"
25 
26 #include "catalog/catalog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/index.h"
29 #include "catalog/indexing.h"
30 #include "catalog/namespace.h"
31 #include "catalog/objectaccess.h"
32 #include "catalog/objectaddress.h"
33 #include "catalog/pg_type.h"
34 #include "catalog/pg_publication.h"
36 
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
45 
46 /*
47  * Check if relation can be in given publication and throws appropriate
48  * error if not.
49  */
50 static void
52 {
53  /* Give more specific error for partitioned tables */
54  if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
55  ereport(ERROR,
56  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
57  errmsg("\"%s\" is a partitioned table",
58  RelationGetRelationName(targetrel)),
59  errdetail("Adding partitioned tables to publications is not supported."),
60  errhint("You can add the table partitions individually.")));
61 
62  /* Must be table */
63  if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
64  ereport(ERROR,
65  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66  errmsg("\"%s\" is not a table",
67  RelationGetRelationName(targetrel)),
68  errdetail("Only tables can be added to publications.")));
69 
70  /* Can't be system table */
71  if (IsCatalogRelation(targetrel))
72  ereport(ERROR,
73  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74  errmsg("\"%s\" is a system table",
75  RelationGetRelationName(targetrel)),
76  errdetail("System tables cannot be added to publications.")));
77 
78  /* UNLOGGED and TEMP relations cannot be part of publication. */
79  if (!RelationNeedsWAL(targetrel))
80  ereport(ERROR,
81  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82  errmsg("table \"%s\" cannot be replicated",
83  RelationGetRelationName(targetrel)),
84  errdetail("Temporary and unlogged relations cannot be replicated.")));
85 }
86 
87 /*
88  * Returns if relation represented by oid and Form_pg_class entry
89  * is publishable.
90  *
91  * Does same checks as the above, but does not need relation to be opened
92  * and also does not throw errors.
93  *
94  * Note this also excludes all tables with relid < FirstNormalObjectId,
95  * ie all tables created during initdb. This mainly affects the preinstalled
96  * information_schema. (IsCatalogClass() only checks for these inside
97  * pg_catalog and toast schemas.)
98  */
99 static bool
101 {
102  return reltuple->relkind == RELKIND_RELATION &&
103  !IsCatalogClass(relid, reltuple) &&
104  reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105  relid >= FirstNormalObjectId;
106 }
107 
108 /*
109  * Insert new publication / relation mapping.
110  */
113  bool if_not_exists)
114 {
115  Relation rel;
116  HeapTuple tup;
118  bool nulls[Natts_pg_publication_rel];
119  Oid relid = RelationGetRelid(targetrel);
120  Oid prrelid;
121  Publication *pub = GetPublication(pubid);
122  ObjectAddress myself,
123  referenced;
124 
126 
127  /*
128  * Check for duplicates. Note that this does not really prevent
129  * duplicates, it's here just to provide nicer error message in common
130  * case. The real protection is the unique key on the catalog.
131  */
133  ObjectIdGetDatum(pubid)))
134  {
136 
137  if (if_not_exists)
138  return InvalidObjectAddress;
139 
140  ereport(ERROR,
142  errmsg("relation \"%s\" is already member of publication \"%s\"",
143  RelationGetRelationName(targetrel), pub->name)));
144  }
145 
147 
148  /* Form a tuple. */
149  memset(values, 0, sizeof(values));
150  memset(nulls, false, sizeof(nulls));
151 
152  values[Anum_pg_publication_rel_prpubid - 1] =
153  ObjectIdGetDatum(pubid);
154  values[Anum_pg_publication_rel_prrelid - 1] =
155  ObjectIdGetDatum(relid);
156 
157  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
158 
159  /* Insert tuple into catalog. */
160  prrelid = CatalogTupleInsert(rel, tup);
161  heap_freetuple(tup);
162 
163  ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
164 
165  /* Add dependency on the publication */
166  ObjectAddressSet(referenced, PublicationRelationId, pubid);
167  recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
168 
169  /* Add dependency on the relation */
170  ObjectAddressSet(referenced, RelationRelationId, relid);
171  recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
172 
173  /* Close the table. */
175 
176  /* Invalidate relcache so that publication info is rebuilt. */
177  CacheInvalidateRelcache(targetrel);
178 
179  return myself;
180 }
181 
182 
183 /*
184  * Gets list of publication oids for a relation oid.
185  */
186 List *
188 {
189  List *result = NIL;
190  CatCList *pubrellist;
191  int i;
192 
193  /* Find all publications associated with the relation. */
195  ObjectIdGetDatum(relid));
196  for (i = 0; i < pubrellist->n_members; i++)
197  {
198  HeapTuple tup = &pubrellist->members[i]->tuple;
199  Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
200 
201  result = lappend_oid(result, pubid);
202  }
203 
204  ReleaseSysCacheList(pubrellist);
205 
206  return result;
207 }
208 
209 /*
210  * Gets list of relation oids for a publication.
211  *
212  * This should only be used for normal publications, the FOR ALL TABLES
213  * should use GetAllTablesPublicationRelations().
214  */
215 List *
217 {
218  List *result;
219  Relation pubrelsrel;
220  ScanKeyData scankey;
221  SysScanDesc scan;
222  HeapTuple tup;
223 
224  /* Find all publications associated with the relation. */
226 
227  ScanKeyInit(&scankey,
229  BTEqualStrategyNumber, F_OIDEQ,
230  ObjectIdGetDatum(pubid));
231 
233  true, NULL, 1, &scankey);
234 
235  result = NIL;
236  while (HeapTupleIsValid(tup = systable_getnext(scan)))
237  {
239 
240  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
241 
242  result = lappend_oid(result, pubrel->prrelid);
243  }
244 
245  systable_endscan(scan);
246  heap_close(pubrelsrel, AccessShareLock);
247 
248  return result;
249 }
250 
251 /*
252  * Gets list of publication oids for publications marked as FOR ALL TABLES.
253  */
254 List *
256 {
257  List *result;
258  Relation rel;
259  ScanKeyData scankey;
260  SysScanDesc scan;
261  HeapTuple tup;
262 
263  /* Find all publications that are marked as for all tables. */
265 
266  ScanKeyInit(&scankey,
268  BTEqualStrategyNumber, F_BOOLEQ,
269  BoolGetDatum(true));
270 
271  scan = systable_beginscan(rel, InvalidOid, false,
272  NULL, 1, &scankey);
273 
274  result = NIL;
275  while (HeapTupleIsValid(tup = systable_getnext(scan)))
276  result = lappend_oid(result, HeapTupleGetOid(tup));
277 
278  systable_endscan(scan);
280 
281  return result;
282 }
283 
284 /*
285  * Gets list of all relation published by FOR ALL TABLES publication(s).
286  */
287 List *
289 {
290  Relation classRel;
291  ScanKeyData key[1];
292  HeapScanDesc scan;
293  HeapTuple tuple;
294  List *result = NIL;
295 
297 
298  ScanKeyInit(&key[0],
300  BTEqualStrategyNumber, F_CHAREQ,
302 
303  scan = heap_beginscan_catalog(classRel, 1, key);
304 
305  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
306  {
307  Oid relid = HeapTupleGetOid(tuple);
308  Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
309 
310  if (is_publishable_class(relid, relForm))
311  result = lappend_oid(result, relid);
312  }
313 
314  heap_endscan(scan);
315  heap_close(classRel, AccessShareLock);
316 
317  return result;
318 }
319 
320 /*
321  * Get publication using oid
322  *
323  * The Publication struct and its data are palloc'ed here.
324  */
325 Publication *
327 {
328  HeapTuple tup;
329  Publication *pub;
330  Form_pg_publication pubform;
331 
333 
334  if (!HeapTupleIsValid(tup))
335  elog(ERROR, "cache lookup failed for publication %u", pubid);
336 
337  pubform = (Form_pg_publication) GETSTRUCT(tup);
338 
339  pub = (Publication *) palloc(sizeof(Publication));
340  pub->oid = pubid;
341  pub->name = pstrdup(NameStr(pubform->pubname));
342  pub->alltables = pubform->puballtables;
343  pub->pubactions.pubinsert = pubform->pubinsert;
344  pub->pubactions.pubupdate = pubform->pubupdate;
345  pub->pubactions.pubdelete = pubform->pubdelete;
346 
347  ReleaseSysCache(tup);
348 
349  return pub;
350 }
351 
352 
353 /*
354  * Get Publication using name.
355  */
356 Publication *
357 GetPublicationByName(const char *pubname, bool missing_ok)
358 {
359  Oid oid;
360 
362  if (!OidIsValid(oid))
363  {
364  if (missing_ok)
365  return NULL;
366 
367  ereport(ERROR,
368  (errcode(ERRCODE_UNDEFINED_OBJECT),
369  errmsg("publication \"%s\" does not exist", pubname)));
370  }
371 
372  return GetPublication(oid);
373 }
374 
375 /*
376  * get_publication_oid - given a publication name, look up the OID
377  *
378  * If missing_ok is false, throw an error if name not found. If true, just
379  * return InvalidOid.
380  */
381 Oid
382 get_publication_oid(const char *pubname, bool missing_ok)
383 {
384  Oid oid;
385 
387  if (!OidIsValid(oid) && !missing_ok)
388  ereport(ERROR,
389  (errcode(ERRCODE_UNDEFINED_OBJECT),
390  errmsg("publication \"%s\" does not exist", pubname)));
391  return oid;
392 }
393 
394 /*
395  * get_publication_name - given a publication Oid, look up the name
396  */
397 char *
399 {
400  HeapTuple tup;
401  char *pubname;
402  Form_pg_publication pubform;
403 
405 
406  if (!HeapTupleIsValid(tup))
407  elog(ERROR, "cache lookup failed for publication %u", pubid);
408 
409  pubform = (Form_pg_publication) GETSTRUCT(tup);
410  pubname = pstrdup(NameStr(pubform->pubname));
411 
412  ReleaseSysCache(tup);
413 
414  return pubname;
415 }
416 
417 /*
418  * Returns Oids of tables in a publication.
419  */
420 Datum
422 {
423  FuncCallContext *funcctx;
424  char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
425  Publication *publication;
426  List *tables;
427  ListCell **lcp;
428 
429  /* stuff done only on the first call of the function */
430  if (SRF_IS_FIRSTCALL())
431  {
432  MemoryContext oldcontext;
433 
434  /* create a function context for cross-call persistence */
435  funcctx = SRF_FIRSTCALL_INIT();
436 
437  /* switch to memory context appropriate for multiple function calls */
438  oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
439 
440  publication = GetPublicationByName(pubname, false);
441  if (publication->alltables)
443  else
444  tables = GetPublicationRelations(publication->oid);
445  lcp = (ListCell **) palloc(sizeof(ListCell *));
446  *lcp = list_head(tables);
447  funcctx->user_fctx = (void *) lcp;
448 
449  MemoryContextSwitchTo(oldcontext);
450  }
451 
452  /* stuff done on every call of the function */
453  funcctx = SRF_PERCALL_SETUP();
454  lcp = (ListCell **) funcctx->user_fctx;
455 
456  while (*lcp != NULL)
457  {
458  Oid relid = lfirst_oid(*lcp);
459 
460  *lcp = lnext(*lcp);
461  SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
462  }
463 
464  SRF_RETURN_DONE(funcctx);
465 }
#define NIL
Definition: pg_list.h:69
PublicationActions pubactions
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:91
int n_members
Definition: catcache.h:154
List * GetAllTablesPublicationRelations(void)
int errhint(const char *fmt,...)
Definition: elog.c:987
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:499
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1578
bool IsCatalogClass(Oid relid, Form_pg_class reltuple)
Definition: catalog.c:103
#define RelationGetDescr(relation)
Definition: rel.h:429
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:285
char * pstrdup(const char *in)
Definition: mcxt.c:1077
#define RelationGetForm(relation)
Definition: rel.h:411
#define RelationRelationId
Definition: pg_class.h:29
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
int errcode(int sqlerrcode)
Definition: elog.c:575
#define PublicationRelationId
return result
Definition: formatting.c:1632
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition: pg_depend.c:44
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:164
#define OidIsValid(objectId)
Definition: c.h:538
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:289
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:328
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:156
#define FirstNormalObjectId
Definition: transam.h:94
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:273
#define PublicationRelRelationId
#define GetSysCacheOid1(cacheId, key1)
Definition: syscache.h:183
#define RELPERSISTENCE_PERMANENT
Definition: pg_class.h:170
CatCTup * members[FLEXIBLE_ARRAY_MEMBER]
Definition: catcache.h:155
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:291
#define Anum_pg_class_relkind
Definition: pg_class.h:118
List * GetRelationPublications(Oid relid)
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:416
Publication * GetPublication(Oid pubid)
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
static void check_publication_add_relation(Relation targetrel)
#define Anum_pg_publication_rel_prpubid
#define Anum_pg_publication_rel_prrelid
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:873
#define CStringGetDatum(X)
Definition: postgres.h:584
#define RelationGetRelationName(relation)
Definition: rel.h:437
static ListCell * list_head(const List *l)
Definition: pg_list.h:77
#define SearchSysCacheList1(cacheId, key1)
Definition: syscache.h:201
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1399
#define lnext(lc)
Definition: pg_list.h:105
#define ereport(elevel, rest)
Definition: elog.h:122
#define PublicationRelPrrelidPrpubidIndexId
Definition: indexing.h:351
#define RELKIND_PARTITIONED_TABLE
Definition: pg_class.h:168
#define ReleaseSysCacheList(x)
Definition: syscache.h:210
char * get_publication_name(Oid pubid)
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1794
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1284
#define BoolGetDatum(X)
Definition: postgres.h:408
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
List * GetAllTablesPublications(void)
List * GetPublicationRelations(Oid pubid)
MemoryContext multi_call_memory_ctx
Definition: funcapi.h:109
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:176
#define RelationNeedsWAL(relation)
Definition: rel.h:506
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define CharGetDatum(X)
Definition: postgres.h:422
void CacheInvalidateRelcache(Relation relation)
Definition: inval.c:1233
static Datum values[MAXATTR]
Definition: bootstrap.c:163
char * text_to_cstring(const text *t)
Definition: varlena.c:182
FormData_pg_class * Form_pg_class
Definition: pg_class.h:95
void * user_fctx
Definition: funcapi.h:90
const ObjectAddress InvalidObjectAddress
Datum pg_get_publication_tables(PG_FUNCTION_ARGS)
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define Anum_pg_publication_puballtables
FormData_pg_publication_rel * Form_pg_publication_rel
int i
#define Natts_pg_publication_rel
#define NameStr(name)
Definition: c.h:499
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
static bool is_publishable_class(Oid relid, Form_pg_class reltuple)
Oid get_publication_oid(const char *pubname, bool missing_ok)
#define PG_FUNCTION_ARGS
Definition: fmgr.h:158
HeapTupleData tuple
Definition: catcache.h:116
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
FormData_pg_publication * Form_pg_publication
#define RELKIND_RELATION
Definition: pg_class.h:160
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:45
#define RelationGetRelid(relation)
Definition: rel.h:417
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define lfirst_oid(lc)
Definition: pg_list.h:108
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:309
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:287