PostgreSQL Source Code  git master
pg_publication.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_publication.c
4  * publication C API manipulation
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * pg_publication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "funcapi.h"
18 #include "miscadmin.h"
19 
20 #include "access/genam.h"
21 #include "access/hash.h"
22 #include "access/heapam.h"
23 #include "access/htup_details.h"
24 #include "access/xact.h"
25 
26 #include "catalog/catalog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/index.h"
29 #include "catalog/indexing.h"
30 #include "catalog/namespace.h"
31 #include "catalog/objectaccess.h"
32 #include "catalog/objectaddress.h"
33 #include "catalog/pg_type.h"
34 #include "catalog/pg_publication.h"
36 
37 #include "utils/array.h"
38 #include "utils/builtins.h"
39 #include "utils/catcache.h"
40 #include "utils/fmgroids.h"
41 #include "utils/inval.h"
42 #include "utils/lsyscache.h"
43 #include "utils/rel.h"
44 #include "utils/syscache.h"
45 
46 /*
47  * Check if relation can be in given publication and throws appropriate
48  * error if not.
49  */
50 static void
52 {
53  /* Give more specific error for partitioned tables */
54  if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
55  ereport(ERROR,
56  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
57  errmsg("\"%s\" is a partitioned table",
58  RelationGetRelationName(targetrel)),
59  errdetail("Adding partitioned tables to publications is not supported."),
60  errhint("You can add the table partitions individually.")));
61 
62  /* Must be table */
63  if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
64  ereport(ERROR,
65  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66  errmsg("\"%s\" is not a table",
67  RelationGetRelationName(targetrel)),
68  errdetail("Only tables can be added to publications.")));
69 
70  /* Can't be system table */
71  if (IsCatalogRelation(targetrel))
72  ereport(ERROR,
73  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74  errmsg("\"%s\" is a system table",
75  RelationGetRelationName(targetrel)),
76  errdetail("System tables cannot be added to publications.")));
77 
78  /* UNLOGGED and TEMP relations cannot be part of publication. */
79  if (!RelationNeedsWAL(targetrel))
80  ereport(ERROR,
81  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82  errmsg("table \"%s\" cannot be replicated",
83  RelationGetRelationName(targetrel)),
84  errdetail("Temporary and unlogged relations cannot be replicated.")));
85 }
86 
87 /*
88  * Returns if relation represented by oid and Form_pg_class entry
89  * is publishable.
90  *
91  * Does same checks as the above, but does not need relation to be opened
92  * and also does not throw errors.
93  *
94  * Note this also excludes all tables with relid < FirstNormalObjectId,
95  * ie all tables created during initdb. This mainly affects the preinstalled
96  * information_schema. (IsCatalogClass() only checks for these inside
97  * pg_catalog and toast schemas.)
98  */
99 static bool
101 {
102  return reltuple->relkind == RELKIND_RELATION &&
103  !IsCatalogClass(relid, reltuple) &&
104  reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
105  relid >= FirstNormalObjectId;
106 }
107 
108 /*
109  * Another variant of this, taking a Relation.
110  */
111 bool
113 {
114  return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
115 }
116 
117 
118 /*
119  * SQL-callable variant of the above
120  *
121  * This returns null when the relation does not exist. This is intended to be
122  * used for example in psql to avoid gratuitous errors when there are
123  * concurrent catalog changes.
124  */
125 Datum
127 {
128  Oid relid = PG_GETARG_OID(0);
129  HeapTuple tuple;
130  bool result;
131 
132  tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
133  if (!tuple)
134  PG_RETURN_NULL();
135  result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
136  ReleaseSysCache(tuple);
137  PG_RETURN_BOOL(result);
138 }
139 
140 
141 /*
142  * Insert new publication / relation mapping.
143  */
146  bool if_not_exists)
147 {
148  Relation rel;
149  HeapTuple tup;
150  Datum values[Natts_pg_publication_rel];
151  bool nulls[Natts_pg_publication_rel];
152  Oid relid = RelationGetRelid(targetrel);
153  Oid prrelid;
154  Publication *pub = GetPublication(pubid);
155  ObjectAddress myself,
156  referenced;
157 
158  rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
159 
160  /*
161  * Check for duplicates. Note that this does not really prevent
162  * duplicates, it's here just to provide nicer error message in common
163  * case. The real protection is the unique key on the catalog.
164  */
166  ObjectIdGetDatum(pubid)))
167  {
169 
170  if (if_not_exists)
171  return InvalidObjectAddress;
172 
173  ereport(ERROR,
175  errmsg("relation \"%s\" is already member of publication \"%s\"",
176  RelationGetRelationName(targetrel), pub->name)));
177  }
178 
180 
181  /* Form a tuple. */
182  memset(values, 0, sizeof(values));
183  memset(nulls, false, sizeof(nulls));
184 
185  values[Anum_pg_publication_rel_prpubid - 1] =
186  ObjectIdGetDatum(pubid);
187  values[Anum_pg_publication_rel_prrelid - 1] =
188  ObjectIdGetDatum(relid);
189 
190  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
191 
192  /* Insert tuple into catalog. */
193  prrelid = CatalogTupleInsert(rel, tup);
194  heap_freetuple(tup);
195 
196  ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
197 
198  /* Add dependency on the publication */
199  ObjectAddressSet(referenced, PublicationRelationId, pubid);
200  recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
201 
202  /* Add dependency on the relation */
203  ObjectAddressSet(referenced, RelationRelationId, relid);
204  recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
205 
206  /* Close the table. */
208 
209  /* Invalidate relcache so that publication info is rebuilt. */
210  CacheInvalidateRelcache(targetrel);
211 
212  return myself;
213 }
214 
215 
216 /*
217  * Gets list of publication oids for a relation oid.
218  */
219 List *
221 {
222  List *result = NIL;
223  CatCList *pubrellist;
224  int i;
225 
226  /* Find all publications associated with the relation. */
228  ObjectIdGetDatum(relid));
229  for (i = 0; i < pubrellist->n_members; i++)
230  {
231  HeapTuple tup = &pubrellist->members[i]->tuple;
232  Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
233 
234  result = lappend_oid(result, pubid);
235  }
236 
237  ReleaseSysCacheList(pubrellist);
238 
239  return result;
240 }
241 
242 /*
243  * Gets list of relation oids for a publication.
244  *
245  * This should only be used for normal publications, the FOR ALL TABLES
246  * should use GetAllTablesPublicationRelations().
247  */
248 List *
250 {
251  List *result;
252  Relation pubrelsrel;
253  ScanKeyData scankey;
254  SysScanDesc scan;
255  HeapTuple tup;
256 
257  /* Find all publications associated with the relation. */
258  pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
259 
260  ScanKeyInit(&scankey,
261  Anum_pg_publication_rel_prpubid,
262  BTEqualStrategyNumber, F_OIDEQ,
263  ObjectIdGetDatum(pubid));
264 
266  true, NULL, 1, &scankey);
267 
268  result = NIL;
269  while (HeapTupleIsValid(tup = systable_getnext(scan)))
270  {
272 
273  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
274 
275  result = lappend_oid(result, pubrel->prrelid);
276  }
277 
278  systable_endscan(scan);
279  heap_close(pubrelsrel, AccessShareLock);
280 
281  return result;
282 }
283 
284 /*
285  * Gets list of publication oids for publications marked as FOR ALL TABLES.
286  */
287 List *
289 {
290  List *result;
291  Relation rel;
292  ScanKeyData scankey;
293  SysScanDesc scan;
294  HeapTuple tup;
295 
296  /* Find all publications that are marked as for all tables. */
297  rel = heap_open(PublicationRelationId, AccessShareLock);
298 
299  ScanKeyInit(&scankey,
300  Anum_pg_publication_puballtables,
301  BTEqualStrategyNumber, F_BOOLEQ,
302  BoolGetDatum(true));
303 
304  scan = systable_beginscan(rel, InvalidOid, false,
305  NULL, 1, &scankey);
306 
307  result = NIL;
308  while (HeapTupleIsValid(tup = systable_getnext(scan)))
309  result = lappend_oid(result, HeapTupleGetOid(tup));
310 
311  systable_endscan(scan);
313 
314  return result;
315 }
316 
317 /*
318  * Gets list of all relation published by FOR ALL TABLES publication(s).
319  */
320 List *
322 {
323  Relation classRel;
324  ScanKeyData key[1];
325  HeapScanDesc scan;
326  HeapTuple tuple;
327  List *result = NIL;
328 
329  classRel = heap_open(RelationRelationId, AccessShareLock);
330 
331  ScanKeyInit(&key[0],
332  Anum_pg_class_relkind,
333  BTEqualStrategyNumber, F_CHAREQ,
334  CharGetDatum(RELKIND_RELATION));
335 
336  scan = heap_beginscan_catalog(classRel, 1, key);
337 
338  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
339  {
340  Oid relid = HeapTupleGetOid(tuple);
341  Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
342 
343  if (is_publishable_class(relid, relForm))
344  result = lappend_oid(result, relid);
345  }
346 
347  heap_endscan(scan);
348  heap_close(classRel, AccessShareLock);
349 
350  return result;
351 }
352 
353 /*
354  * Get publication using oid
355  *
356  * The Publication struct and its data are palloc'ed here.
357  */
358 Publication *
360 {
361  HeapTuple tup;
362  Publication *pub;
363  Form_pg_publication pubform;
364 
366 
367  if (!HeapTupleIsValid(tup))
368  elog(ERROR, "cache lookup failed for publication %u", pubid);
369 
370  pubform = (Form_pg_publication) GETSTRUCT(tup);
371 
372  pub = (Publication *) palloc(sizeof(Publication));
373  pub->oid = pubid;
374  pub->name = pstrdup(NameStr(pubform->pubname));
375  pub->alltables = pubform->puballtables;
376  pub->pubactions.pubinsert = pubform->pubinsert;
377  pub->pubactions.pubupdate = pubform->pubupdate;
378  pub->pubactions.pubdelete = pubform->pubdelete;
379  pub->pubactions.pubtruncate = pubform->pubtruncate;
380 
381  ReleaseSysCache(tup);
382 
383  return pub;
384 }
385 
386 
387 /*
388  * Get Publication using name.
389  */
390 Publication *
391 GetPublicationByName(const char *pubname, bool missing_ok)
392 {
393  Oid oid;
394 
396  if (!OidIsValid(oid))
397  {
398  if (missing_ok)
399  return NULL;
400 
401  ereport(ERROR,
402  (errcode(ERRCODE_UNDEFINED_OBJECT),
403  errmsg("publication \"%s\" does not exist", pubname)));
404  }
405 
406  return GetPublication(oid);
407 }
408 
409 /*
410  * get_publication_oid - given a publication name, look up the OID
411  *
412  * If missing_ok is false, throw an error if name not found. If true, just
413  * return InvalidOid.
414  */
415 Oid
416 get_publication_oid(const char *pubname, bool missing_ok)
417 {
418  Oid oid;
419 
421  if (!OidIsValid(oid) && !missing_ok)
422  ereport(ERROR,
423  (errcode(ERRCODE_UNDEFINED_OBJECT),
424  errmsg("publication \"%s\" does not exist", pubname)));
425  return oid;
426 }
427 
428 /*
429  * get_publication_name - given a publication Oid, look up the name
430  */
431 char *
433 {
434  HeapTuple tup;
435  char *pubname;
436  Form_pg_publication pubform;
437 
439 
440  if (!HeapTupleIsValid(tup))
441  elog(ERROR, "cache lookup failed for publication %u", pubid);
442 
443  pubform = (Form_pg_publication) GETSTRUCT(tup);
444  pubname = pstrdup(NameStr(pubform->pubname));
445 
446  ReleaseSysCache(tup);
447 
448  return pubname;
449 }
450 
451 /*
452  * Returns Oids of tables in a publication.
453  */
454 Datum
456 {
457  FuncCallContext *funcctx;
458  char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
459  Publication *publication;
460  List *tables;
461  ListCell **lcp;
462 
463  /* stuff done only on the first call of the function */
464  if (SRF_IS_FIRSTCALL())
465  {
466  MemoryContext oldcontext;
467 
468  /* create a function context for cross-call persistence */
469  funcctx = SRF_FIRSTCALL_INIT();
470 
471  /* switch to memory context appropriate for multiple function calls */
472  oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
473 
474  publication = GetPublicationByName(pubname, false);
475  if (publication->alltables)
477  else
478  tables = GetPublicationRelations(publication->oid);
479  lcp = (ListCell **) palloc(sizeof(ListCell *));
480  *lcp = list_head(tables);
481  funcctx->user_fctx = (void *) lcp;
482 
483  MemoryContextSwitchTo(oldcontext);
484  }
485 
486  /* stuff done on every call of the function */
487  funcctx = SRF_PERCALL_SETUP();
488  lcp = (ListCell **) funcctx->user_fctx;
489 
490  while (*lcp != NULL)
491  {
492  Oid relid = lfirst_oid(*lcp);
493 
494  *lcp = lnext(*lcp);
495  SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
496  }
497 
498  SRF_RETURN_DONE(funcctx);
499 }
#define NIL
Definition: pg_list.h:69
PublicationActions pubactions
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:92
int n_members
Definition: catcache.h:176
List * GetAllTablesPublicationRelations(void)
int errhint(const char *fmt,...)
Definition: elog.c:987
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:502
#define GETSTRUCT(TUP)
Definition: htup_details.h:668
void heap_endscan(HeapScanDesc scan)
Definition: heapam.c:1572
bool IsCatalogClass(Oid relid, Form_pg_class reltuple)
Definition: catalog.c:104
#define RelationGetDescr(relation)
Definition: rel.h:433
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:294
char * pstrdup(const char *in)
Definition: mcxt.c:1161
#define RelationGetForm(relation)
Definition: rel.h:401
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
int errcode(int sqlerrcode)
Definition: elog.c:575
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition: pg_depend.c:44
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1074
#define heap_close(r, l)
Definition: heapam.h:97
Form_pg_class rd_rel
Definition: rel.h:84
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1773
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:164
#define OidIsValid(objectId)
Definition: c.h:605
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:298
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:331
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
#define FirstNormalObjectId
Definition: transam.h:94
char relkind
Definition: pg_class.h:51
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:278
#define GetSysCacheOid1(cacheId, key1)
Definition: syscache.h:191
CatCTup * members[FLEXIBLE_ARRAY_MEMBER]
Definition: catcache.h:178
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:300
List * GetRelationPublications(Oid relid)
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:419
Publication * GetPublication(Oid pubid)
#define ObjectIdGetDatum(X)
Definition: postgres.h:492
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:163
static void check_publication_add_relation(Relation targetrel)
#define PG_GETARG_OID(n)
Definition: fmgr.h:245
bool is_publishable_relation(Relation rel)
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:873
Datum pg_relation_is_publishable(PG_FUNCTION_ARGS)
#define CStringGetDatum(X)
Definition: postgres.h:563
#define RelationGetRelationName(relation)
Definition: rel.h:441
static ListCell * list_head(const List *l)
Definition: pg_list.h:77
#define SearchSysCacheList1(cacheId, key1)
Definition: syscache.h:209
HeapScanDesc heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Definition: heapam.c:1412
#define lnext(lc)
Definition: pg_list.h:105
#define ereport(elevel, rest)
Definition: elog.h:122
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1112
#define PublicationRelPrrelidPrpubidIndexId
Definition: indexing.h:352
#define ReleaseSysCacheList(x)
Definition: syscache.h:216
char * get_publication_name(Oid pubid)
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:324
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1160
HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction)
Definition: heapam.c:1835
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1294
#define BoolGetDatum(X)
Definition: postgres.h:387
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
List * GetAllTablesPublications(void)
List * GetPublicationRelations(Oid pubid)
MemoryContext multi_call_memory_ctx
Definition: funcapi.h:110
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:184
#define RelationNeedsWAL(relation)
Definition: rel.h:510
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define CharGetDatum(X)
Definition: postgres.h:401
void CacheInvalidateRelcache(Relation relation)
Definition: inval.c:1233
static Datum values[MAXATTR]
Definition: bootstrap.c:164
char * text_to_cstring(const text *t)
Definition: varlena.c:182
FormData_pg_class * Form_pg_class
Definition: pg_class.h:93
void * user_fctx
Definition: funcapi.h:91
const ObjectAddress InvalidObjectAddress
Datum pg_get_publication_tables(PG_FUNCTION_ARGS)
void * palloc(Size size)
Definition: mcxt.c:924
int errmsg(const char *fmt,...)
Definition: elog.c:797
FormData_pg_publication_rel * Form_pg_publication_rel
int i
#define NameStr(name)
Definition: c.h:576
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
static bool is_publishable_class(Oid relid, Form_pg_class reltuple)
Oid get_publication_oid(const char *pubname, bool missing_ok)
#define PG_FUNCTION_ARGS
Definition: fmgr.h:163
HeapTupleData tuple
Definition: catcache.h:121
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:707
FormData_pg_publication * Form_pg_publication
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
Definition: pg_list.h:45
#define RelationGetRelid(relation)
Definition: rel.h:407
#define PG_RETURN_NULL()
Definition: fmgr.h:310
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define lfirst_oid(lc)
Definition: pg_list.h:108
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:318
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:296