PostgreSQL Source Code  git master
pg_publication.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_publication.c
4  * publication C API manipulation
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * pg_publication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/index.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/objectaccess.h"
28 #include "catalog/objectaddress.h"
29 #include "catalog/pg_publication.h"
31 #include "catalog/pg_type.h"
32 #include "funcapi.h"
33 #include "miscadmin.h"
34 #include "utils/array.h"
35 #include "utils/builtins.h"
36 #include "utils/catcache.h"
37 #include "utils/fmgroids.h"
38 #include "utils/inval.h"
39 #include "utils/lsyscache.h"
40 #include "utils/rel.h"
41 #include "utils/syscache.h"
42 
43 /*
44  * Check if relation can be in given publication and throws appropriate
45  * error if not.
46  */
47 static void
49 {
50  /* Give more specific error for partitioned tables */
51  if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
52  ereport(ERROR,
53  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
54  errmsg("\"%s\" is a partitioned table",
55  RelationGetRelationName(targetrel)),
56  errdetail("Adding partitioned tables to publications is not supported."),
57  errhint("You can add the table partitions individually.")));
58 
59  /* Must be table */
60  if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
61  ereport(ERROR,
62  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
63  errmsg("\"%s\" is not a table",
64  RelationGetRelationName(targetrel)),
65  errdetail("Only tables can be added to publications.")));
66 
67  /* Can't be system table */
68  if (IsCatalogRelation(targetrel))
69  ereport(ERROR,
70  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
71  errmsg("\"%s\" is a system table",
72  RelationGetRelationName(targetrel)),
73  errdetail("System tables cannot be added to publications.")));
74 
75  /* UNLOGGED and TEMP relations cannot be part of publication. */
76  if (!RelationNeedsWAL(targetrel))
77  ereport(ERROR,
78  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
79  errmsg("table \"%s\" cannot be replicated",
80  RelationGetRelationName(targetrel)),
81  errdetail("Temporary and unlogged relations cannot be replicated.")));
82 }
83 
84 /*
85  * Returns if relation represented by oid and Form_pg_class entry
86  * is publishable.
87  *
88  * Does same checks as the above, but does not need relation to be opened
89  * and also does not throw errors.
90  *
91  * XXX This also excludes all tables with relid < FirstNormalObjectId,
92  * ie all tables created during initdb. This mainly affects the preinstalled
93  * information_schema. IsCatalogRelationOid() only excludes tables with
94  * relid < FirstBootstrapObjectId, making that test rather redundant,
95  * but really we should get rid of the FirstNormalObjectId test not
96  * IsCatalogRelationOid. We can't do so today because we don't want
97  * information_schema tables to be considered publishable; but this test
98  * is really inadequate for that, since the information_schema could be
99  * dropped and reloaded and then it'll be considered publishable. The best
100  * long-term solution may be to add a "relispublishable" bool to pg_class,
101  * and depend on that instead of OID checks.
102  */
103 static bool
105 {
106  return reltuple->relkind == RELKIND_RELATION &&
107  !IsCatalogRelationOid(relid) &&
108  reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
109  relid >= FirstNormalObjectId;
110 }
111 
112 /*
113  * Another variant of this, taking a Relation.
114  */
115 bool
117 {
118  return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
119 }
120 
121 
122 /*
123  * SQL-callable variant of the above
124  *
125  * This returns null when the relation does not exist. This is intended to be
126  * used for example in psql to avoid gratuitous errors when there are
127  * concurrent catalog changes.
128  */
129 Datum
131 {
132  Oid relid = PG_GETARG_OID(0);
133  HeapTuple tuple;
134  bool result;
135 
136  tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
137  if (!HeapTupleIsValid(tuple))
138  PG_RETURN_NULL();
139  result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
140  ReleaseSysCache(tuple);
141  PG_RETURN_BOOL(result);
142 }
143 
144 
145 /*
146  * Insert new publication / relation mapping.
147  */
150  bool if_not_exists)
151 {
152  Relation rel;
153  HeapTuple tup;
154  Datum values[Natts_pg_publication_rel];
155  bool nulls[Natts_pg_publication_rel];
156  Oid relid = RelationGetRelid(targetrel);
157  Oid prrelid;
158  Publication *pub = GetPublication(pubid);
159  ObjectAddress myself,
160  referenced;
161 
162  rel = table_open(PublicationRelRelationId, RowExclusiveLock);
163 
164  /*
165  * Check for duplicates. Note that this does not really prevent
166  * duplicates, it's here just to provide nicer error message in common
167  * case. The real protection is the unique key on the catalog.
168  */
170  ObjectIdGetDatum(pubid)))
171  {
173 
174  if (if_not_exists)
175  return InvalidObjectAddress;
176 
177  ereport(ERROR,
179  errmsg("relation \"%s\" is already member of publication \"%s\"",
180  RelationGetRelationName(targetrel), pub->name)));
181  }
182 
184 
185  /* Form a tuple. */
186  memset(values, 0, sizeof(values));
187  memset(nulls, false, sizeof(nulls));
188 
190  Anum_pg_publication_rel_oid);
191  values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid);
192  values[Anum_pg_publication_rel_prpubid - 1] =
193  ObjectIdGetDatum(pubid);
194  values[Anum_pg_publication_rel_prrelid - 1] =
195  ObjectIdGetDatum(relid);
196 
197  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
198 
199  /* Insert tuple into catalog. */
200  CatalogTupleInsert(rel, tup);
201  heap_freetuple(tup);
202 
203  ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
204 
205  /* Add dependency on the publication */
206  ObjectAddressSet(referenced, PublicationRelationId, pubid);
207  recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
208 
209  /* Add dependency on the relation */
210  ObjectAddressSet(referenced, RelationRelationId, relid);
211  recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
212 
213  /* Close the table. */
215 
216  /* Invalidate relcache so that publication info is rebuilt. */
217  CacheInvalidateRelcache(targetrel);
218 
219  return myself;
220 }
221 
222 
223 /*
224  * Gets list of publication oids for a relation oid.
225  */
226 List *
228 {
229  List *result = NIL;
230  CatCList *pubrellist;
231  int i;
232 
233  /* Find all publications associated with the relation. */
235  ObjectIdGetDatum(relid));
236  for (i = 0; i < pubrellist->n_members; i++)
237  {
238  HeapTuple tup = &pubrellist->members[i]->tuple;
239  Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid;
240 
241  result = lappend_oid(result, pubid);
242  }
243 
244  ReleaseSysCacheList(pubrellist);
245 
246  return result;
247 }
248 
249 /*
250  * Gets list of relation oids for a publication.
251  *
252  * This should only be used for normal publications, the FOR ALL TABLES
253  * should use GetAllTablesPublicationRelations().
254  */
255 List *
257 {
258  List *result;
259  Relation pubrelsrel;
260  ScanKeyData scankey;
261  SysScanDesc scan;
262  HeapTuple tup;
263 
264  /* Find all publications associated with the relation. */
265  pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
266 
267  ScanKeyInit(&scankey,
268  Anum_pg_publication_rel_prpubid,
269  BTEqualStrategyNumber, F_OIDEQ,
270  ObjectIdGetDatum(pubid));
271 
273  true, NULL, 1, &scankey);
274 
275  result = NIL;
276  while (HeapTupleIsValid(tup = systable_getnext(scan)))
277  {
279 
280  pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
281 
282  result = lappend_oid(result, pubrel->prrelid);
283  }
284 
285  systable_endscan(scan);
286  table_close(pubrelsrel, AccessShareLock);
287 
288  return result;
289 }
290 
291 /*
292  * Gets list of publication oids for publications marked as FOR ALL TABLES.
293  */
294 List *
296 {
297  List *result;
298  Relation rel;
299  ScanKeyData scankey;
300  SysScanDesc scan;
301  HeapTuple tup;
302 
303  /* Find all publications that are marked as for all tables. */
304  rel = table_open(PublicationRelationId, AccessShareLock);
305 
306  ScanKeyInit(&scankey,
307  Anum_pg_publication_puballtables,
308  BTEqualStrategyNumber, F_BOOLEQ,
309  BoolGetDatum(true));
310 
311  scan = systable_beginscan(rel, InvalidOid, false,
312  NULL, 1, &scankey);
313 
314  result = NIL;
315  while (HeapTupleIsValid(tup = systable_getnext(scan)))
316  {
317  Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
318 
319  result = lappend_oid(result, oid);
320  }
321 
322  systable_endscan(scan);
324 
325  return result;
326 }
327 
328 /*
329  * Gets list of all relation published by FOR ALL TABLES publication(s).
330  */
331 List *
333 {
334  Relation classRel;
335  ScanKeyData key[1];
336  TableScanDesc scan;
337  HeapTuple tuple;
338  List *result = NIL;
339 
340  classRel = table_open(RelationRelationId, AccessShareLock);
341 
342  ScanKeyInit(&key[0],
343  Anum_pg_class_relkind,
344  BTEqualStrategyNumber, F_CHAREQ,
345  CharGetDatum(RELKIND_RELATION));
346 
347  scan = table_beginscan_catalog(classRel, 1, key);
348 
349  while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
350  {
351  Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
352  Oid relid = relForm->oid;
353 
354  if (is_publishable_class(relid, relForm))
355  result = lappend_oid(result, relid);
356  }
357 
358  table_endscan(scan);
359  table_close(classRel, AccessShareLock);
360 
361  return result;
362 }
363 
364 /*
365  * Get publication using oid
366  *
367  * The Publication struct and its data are palloc'ed here.
368  */
369 Publication *
371 {
372  HeapTuple tup;
373  Publication *pub;
374  Form_pg_publication pubform;
375 
377 
378  if (!HeapTupleIsValid(tup))
379  elog(ERROR, "cache lookup failed for publication %u", pubid);
380 
381  pubform = (Form_pg_publication) GETSTRUCT(tup);
382 
383  pub = (Publication *) palloc(sizeof(Publication));
384  pub->oid = pubid;
385  pub->name = pstrdup(NameStr(pubform->pubname));
386  pub->alltables = pubform->puballtables;
387  pub->pubactions.pubinsert = pubform->pubinsert;
388  pub->pubactions.pubupdate = pubform->pubupdate;
389  pub->pubactions.pubdelete = pubform->pubdelete;
390  pub->pubactions.pubtruncate = pubform->pubtruncate;
391 
392  ReleaseSysCache(tup);
393 
394  return pub;
395 }
396 
397 
398 /*
399  * Get Publication using name.
400  */
401 Publication *
402 GetPublicationByName(const char *pubname, bool missing_ok)
403 {
404  Oid oid;
405 
406  oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
407  CStringGetDatum(pubname));
408  if (!OidIsValid(oid))
409  {
410  if (missing_ok)
411  return NULL;
412 
413  ereport(ERROR,
414  (errcode(ERRCODE_UNDEFINED_OBJECT),
415  errmsg("publication \"%s\" does not exist", pubname)));
416  }
417 
418  return GetPublication(oid);
419 }
420 
421 /*
422  * get_publication_oid - given a publication name, look up the OID
423  *
424  * If missing_ok is false, throw an error if name not found. If true, just
425  * return InvalidOid.
426  */
427 Oid
428 get_publication_oid(const char *pubname, bool missing_ok)
429 {
430  Oid oid;
431 
432  oid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
433  CStringGetDatum(pubname));
434  if (!OidIsValid(oid) && !missing_ok)
435  ereport(ERROR,
436  (errcode(ERRCODE_UNDEFINED_OBJECT),
437  errmsg("publication \"%s\" does not exist", pubname)));
438  return oid;
439 }
440 
441 /*
442  * get_publication_name - given a publication Oid, look up the name
443  *
444  * If missing_ok is false, throw an error if name not found. If true, just
445  * return NULL.
446  */
447 char *
448 get_publication_name(Oid pubid, bool missing_ok)
449 {
450  HeapTuple tup;
451  char *pubname;
452  Form_pg_publication pubform;
453 
455 
456  if (!HeapTupleIsValid(tup))
457  {
458  if (!missing_ok)
459  elog(ERROR, "cache lookup failed for publication %u", pubid);
460  return NULL;
461  }
462 
463  pubform = (Form_pg_publication) GETSTRUCT(tup);
464  pubname = pstrdup(NameStr(pubform->pubname));
465 
466  ReleaseSysCache(tup);
467 
468  return pubname;
469 }
470 
471 /*
472  * Returns Oids of tables in a publication.
473  */
474 Datum
476 {
477  FuncCallContext *funcctx;
478  char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
479  Publication *publication;
480  List *tables;
481 
482  /* stuff done only on the first call of the function */
483  if (SRF_IS_FIRSTCALL())
484  {
485  MemoryContext oldcontext;
486 
487  /* create a function context for cross-call persistence */
488  funcctx = SRF_FIRSTCALL_INIT();
489 
490  /* switch to memory context appropriate for multiple function calls */
491  oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
492 
493  publication = GetPublicationByName(pubname, false);
494  if (publication->alltables)
496  else
497  tables = GetPublicationRelations(publication->oid);
498  funcctx->user_fctx = (void *) tables;
499 
500  MemoryContextSwitchTo(oldcontext);
501  }
502 
503  /* stuff done on every call of the function */
504  funcctx = SRF_PERCALL_SETUP();
505  tables = (List *) funcctx->user_fctx;
506 
507  if (funcctx->call_cntr < list_length(tables))
508  {
509  Oid relid = list_nth_oid(tables, funcctx->call_cntr);
510 
511  SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
512  }
513 
514  SRF_RETURN_DONE(funcctx);
515 }
uint64 call_cntr
Definition: funcapi.h:65
#define NIL
Definition: pg_list.h:65
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:322
PublicationActions pubactions
bool IsCatalogRelation(Relation relation)
Definition: catalog.c:99
int n_members
Definition: catcache.h:176
List * GetAllTablesPublicationRelations(void)
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
int errhint(const char *fmt,...)
Definition: elog.c:1069
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:525
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
Definition: tableam.c:98
#define RelationGetDescr(relation)
Definition: rel.h:448
#define SRF_IS_FIRSTCALL()
Definition: funcapi.h:282
#define GetSysCacheOid1(cacheId, oidcol, key1)
Definition: syscache.h:192
char * pstrdup(const char *in)
Definition: mcxt.c:1186
#define RelationGetForm(relation)
Definition: rel.h:416
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define AccessShareLock
Definition: lockdefs.h:36
int errcode(int sqlerrcode)
Definition: elog.c:608
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition: pg_depend.c:43
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
Form_pg_class rd_rel
Definition: rel.h:83
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
List * lappend_oid(List *list, Oid datum)
Definition: list.c:358
#define OidIsValid(objectId)
Definition: c.h:645
#define SRF_PERCALL_SETUP()
Definition: funcapi.h:286
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:352
Publication * GetPublicationByName(const char *pubname, bool missing_ok)
char * get_publication_name(Oid pubid, bool missing_ok)
#define FirstNormalObjectId
Definition: transam.h:141
char relkind
Definition: pg_class.h:81
#define PG_GETARG_TEXT_PP(n)
Definition: fmgr.h:303
static Oid list_nth_oid(const List *list, int n)
Definition: pg_list.h:299
bool IsCatalogRelationOid(Oid relid)
Definition: catalog.c:116
CatCTup * members[FLEXIBLE_ARRAY_MEMBER]
Definition: catcache.h:178
#define SRF_RETURN_NEXT(_funcctx, _result)
Definition: funcapi.h:288
List * GetRelationPublications(Oid relid)
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:444
Publication * GetPublication(Oid pubid)
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
static void check_publication_add_relation(Relation targetrel)
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1290
#define PG_GETARG_OID(n)
Definition: fmgr.h:270
bool is_publishable_relation(Relation rel)
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:955
Datum pg_relation_is_publishable(PG_FUNCTION_ARGS)
#define CStringGetDatum(X)
Definition: postgres.h:578
#define RelationGetRelationName(relation)
Definition: rel.h:456
#define SearchSysCacheList1(cacheId, key1)
Definition: syscache.h:210
#define ereport(elevel, rest)
Definition: elog.h:141
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:1116
#define PublicationRelPrrelidPrpubidIndexId
Definition: indexing.h:358
#define ReleaseSysCacheList(x)
Definition: syscache.h:217
#define PG_RETURN_BOOL(x)
Definition: fmgr.h:349
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
#define BoolGetDatum(X)
Definition: postgres.h:402
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
List * GetAllTablesPublications(void)
List * GetPublicationRelations(Oid pubid)
MemoryContext multi_call_memory_ctx
Definition: funcapi.h:101
static int list_length(const List *l)
Definition: pg_list.h:169
#define SearchSysCacheExists2(cacheId, key1, key2)
Definition: syscache.h:185
#define RelationNeedsWAL(relation)
Definition: rel.h:524
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define CharGetDatum(X)
Definition: postgres.h:416
#define PublicationRelObjectIndexId
Definition: indexing.h:355
void CacheInvalidateRelcache(Relation relation)
Definition: inval.c:1270
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:831
static Datum values[MAXATTR]
Definition: bootstrap.c:167
char * text_to_cstring(const text *t)
Definition: varlena.c:204
FormData_pg_class * Form_pg_class
Definition: pg_class.h:150
void * user_fctx
Definition: funcapi.h:82
const ObjectAddress InvalidObjectAddress
Datum pg_get_publication_tables(PG_FUNCTION_ARGS)
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:822
#define elog(elevel,...)
Definition: elog.h:228
FormData_pg_publication_rel * Form_pg_publication_rel
int i
#define NameStr(name)
Definition: c.h:616
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
static bool is_publishable_class(Oid relid, Form_pg_class reltuple)
Oid get_publication_oid(const char *pubname, bool missing_ok)
#define PG_FUNCTION_ARGS
Definition: fmgr.h:188
HeapTupleData tuple
Definition: catcache.h:121
ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists)
FormData_pg_publication * Form_pg_publication
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:50
#define RelationGetRelid(relation)
Definition: rel.h:422
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:183
#define PG_RETURN_NULL()
Definition: fmgr.h:335
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define SRF_RETURN_DONE(_funcctx)
Definition: funcapi.h:306
#define SRF_FIRSTCALL_INIT()
Definition: funcapi.h:284