PostgreSQL Source Code  git master
pg_subscription.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_subscription.c
4  * replication subscriptions
5  *
6  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/backend/catalog/pg_subscription.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/indexing.h"
25 #include "catalog/pg_type.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "storage/lmgr.h"
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/lsyscache.h"
33 #include "utils/pg_lsn.h"
34 #include "utils/rel.h"
35 #include "utils/syscache.h"
36 
37 static List *textarray_to_stringlist(ArrayType *textarray);
38 
39 /*
40  * Fetch the subscription from the syscache.
41  */
43 GetSubscription(Oid subid, bool missing_ok)
44 {
45  HeapTuple tup;
46  Subscription *sub;
47  Form_pg_subscription subform;
48  Datum datum;
49  bool isnull;
50 
52 
53  if (!HeapTupleIsValid(tup))
54  {
55  if (missing_ok)
56  return NULL;
57 
58  elog(ERROR, "cache lookup failed for subscription %u", subid);
59  }
60 
61  subform = (Form_pg_subscription) GETSTRUCT(tup);
62 
63  sub = (Subscription *) palloc(sizeof(Subscription));
64  sub->oid = subid;
65  sub->dbid = subform->subdbid;
66  sub->skiplsn = subform->subskiplsn;
67  sub->name = pstrdup(NameStr(subform->subname));
68  sub->owner = subform->subowner;
69  sub->enabled = subform->subenabled;
70  sub->binary = subform->subbinary;
71  sub->stream = subform->substream;
72  sub->twophasestate = subform->subtwophasestate;
73  sub->disableonerr = subform->subdisableonerr;
74  sub->passwordrequired = subform->subpasswordrequired;
75  sub->runasowner = subform->subrunasowner;
76 
77  /* Get conninfo */
79  tup,
80  Anum_pg_subscription_subconninfo);
81  sub->conninfo = TextDatumGetCString(datum);
82 
83  /* Get slotname */
85  tup,
86  Anum_pg_subscription_subslotname,
87  &isnull);
88  if (!isnull)
89  sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90  else
91  sub->slotname = NULL;
92 
93  /* Get synccommit */
95  tup,
96  Anum_pg_subscription_subsynccommit);
97  sub->synccommit = TextDatumGetCString(datum);
98 
99  /* Get publications */
101  tup,
102  Anum_pg_subscription_subpublications);
104 
105  /* Get origin */
107  tup,
108  Anum_pg_subscription_suborigin);
109  sub->origin = TextDatumGetCString(datum);
110 
111  ReleaseSysCache(tup);
112 
113  return sub;
114 }
115 
116 /*
117  * Return number of subscriptions defined in given database.
118  * Used by dropdb() to check if database can indeed be dropped.
119  */
120 int
122 {
123  int nsubs = 0;
124  Relation rel;
125  ScanKeyData scankey;
126  SysScanDesc scan;
127  HeapTuple tup;
128 
129  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
130 
131  ScanKeyInit(&scankey,
132  Anum_pg_subscription_subdbid,
133  BTEqualStrategyNumber, F_OIDEQ,
134  ObjectIdGetDatum(dbid));
135 
136  scan = systable_beginscan(rel, InvalidOid, false,
137  NULL, 1, &scankey);
138 
139  while (HeapTupleIsValid(tup = systable_getnext(scan)))
140  nsubs++;
141 
142  systable_endscan(scan);
143 
144  table_close(rel, NoLock);
145 
146  return nsubs;
147 }
148 
149 /*
150  * Free memory allocated by subscription struct.
151  */
152 void
154 {
155  pfree(sub->name);
156  pfree(sub->conninfo);
157  if (sub->slotname)
158  pfree(sub->slotname);
160  pfree(sub);
161 }
162 
163 /*
164  * Disable the given subscription.
165  */
166 void
168 {
169  Relation rel;
170  bool nulls[Natts_pg_subscription];
171  bool replaces[Natts_pg_subscription];
172  Datum values[Natts_pg_subscription];
173  HeapTuple tup;
174 
175  /* Look up the subscription in the catalog */
176  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
178 
179  if (!HeapTupleIsValid(tup))
180  elog(ERROR, "cache lookup failed for subscription %u", subid);
181 
182  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
183 
184  /* Form a new tuple. */
185  memset(values, 0, sizeof(values));
186  memset(nulls, false, sizeof(nulls));
187  memset(replaces, false, sizeof(replaces));
188 
189  /* Set the subscription to disabled. */
190  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
191  replaces[Anum_pg_subscription_subenabled - 1] = true;
192 
193  /* Update the catalog */
194  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
195  replaces);
196  CatalogTupleUpdate(rel, &tup->t_self, tup);
197  heap_freetuple(tup);
198 
199  table_close(rel, NoLock);
200 }
201 
202 /*
203  * Convert text array to list of strings.
204  *
205  * Note: the resulting list of strings is pallocated here.
206  */
207 static List *
209 {
210  Datum *elems;
211  int nelems,
212  i;
213  List *res = NIL;
214 
215  deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
216 
217  if (nelems == 0)
218  return NIL;
219 
220  for (i = 0; i < nelems; i++)
222 
223  return res;
224 }
225 
226 /*
227  * Add new state record for a subscription table.
228  */
229 void
231  XLogRecPtr sublsn)
232 {
233  Relation rel;
234  HeapTuple tup;
235  bool nulls[Natts_pg_subscription_rel];
236  Datum values[Natts_pg_subscription_rel];
237 
238  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
239 
240  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
241 
242  /* Try finding existing mapping. */
244  ObjectIdGetDatum(relid),
245  ObjectIdGetDatum(subid));
246  if (HeapTupleIsValid(tup))
247  elog(ERROR, "subscription table %u in subscription %u already exists",
248  relid, subid);
249 
250  /* Form the tuple. */
251  memset(values, 0, sizeof(values));
252  memset(nulls, false, sizeof(nulls));
253  values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
254  values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
255  values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
256  if (sublsn != InvalidXLogRecPtr)
257  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
258  else
259  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
260 
261  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
262 
263  /* Insert tuple into catalog. */
264  CatalogTupleInsert(rel, tup);
265 
266  heap_freetuple(tup);
267 
268  /* Cleanup. */
269  table_close(rel, NoLock);
270 }
271 
272 /*
273  * Update the state of a subscription table.
274  */
275 void
277  XLogRecPtr sublsn)
278 {
279  Relation rel;
280  HeapTuple tup;
281  bool nulls[Natts_pg_subscription_rel];
282  Datum values[Natts_pg_subscription_rel];
283  bool replaces[Natts_pg_subscription_rel];
284 
285  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
286 
287  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
288 
289  /* Try finding existing mapping. */
291  ObjectIdGetDatum(relid),
292  ObjectIdGetDatum(subid));
293  if (!HeapTupleIsValid(tup))
294  elog(ERROR, "subscription table %u in subscription %u does not exist",
295  relid, subid);
296 
297  /* Update the tuple. */
298  memset(values, 0, sizeof(values));
299  memset(nulls, false, sizeof(nulls));
300  memset(replaces, false, sizeof(replaces));
301 
302  replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
303  values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
304 
305  replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
306  if (sublsn != InvalidXLogRecPtr)
307  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
308  else
309  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
310 
311  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
312  replaces);
313 
314  /* Update the catalog. */
315  CatalogTupleUpdate(rel, &tup->t_self, tup);
316 
317  /* Cleanup. */
318  table_close(rel, NoLock);
319 }
320 
321 /*
322  * Get state of subscription table.
323  *
324  * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
325  */
326 char
328 {
329  HeapTuple tup;
330  char substate;
331  bool isnull;
332  Datum d;
333  Relation rel;
334 
335  /*
336  * This is to avoid the race condition with AlterSubscription which tries
337  * to remove this relstate.
338  */
339  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
340 
341  /* Try finding the mapping. */
343  ObjectIdGetDatum(relid),
344  ObjectIdGetDatum(subid));
345 
346  if (!HeapTupleIsValid(tup))
347  {
349  *sublsn = InvalidXLogRecPtr;
350  return SUBREL_STATE_UNKNOWN;
351  }
352 
353  /* Get the state. */
354  substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
355 
356  /* Get the LSN */
358  Anum_pg_subscription_rel_srsublsn, &isnull);
359  if (isnull)
360  *sublsn = InvalidXLogRecPtr;
361  else
362  *sublsn = DatumGetLSN(d);
363 
364  /* Cleanup */
365  ReleaseSysCache(tup);
366 
368 
369  return substate;
370 }
371 
372 /*
373  * Drop subscription relation mapping. These can be for a particular
374  * subscription, or for a particular relation, or both.
375  */
376 void
378 {
379  Relation rel;
380  TableScanDesc scan;
381  ScanKeyData skey[2];
382  HeapTuple tup;
383  int nkeys = 0;
384 
385  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
386 
387  if (OidIsValid(subid))
388  {
389  ScanKeyInit(&skey[nkeys++],
390  Anum_pg_subscription_rel_srsubid,
392  F_OIDEQ,
393  ObjectIdGetDatum(subid));
394  }
395 
396  if (OidIsValid(relid))
397  {
398  ScanKeyInit(&skey[nkeys++],
399  Anum_pg_subscription_rel_srrelid,
401  F_OIDEQ,
402  ObjectIdGetDatum(relid));
403  }
404 
405  /* Do the search and delete what we found. */
406  scan = table_beginscan_catalog(rel, nkeys, skey);
408  {
410 
411  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
412 
413  /*
414  * We don't allow to drop the relation mapping when the table
415  * synchronization is in progress unless the caller updates the
416  * corresponding subscription as well. This is to ensure that we don't
417  * leave tablesync slots or origins in the system when the
418  * corresponding table is dropped.
419  */
420  if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
421  {
422  ereport(ERROR,
423  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
424  errmsg("could not drop relation mapping for subscription \"%s\"",
425  get_subscription_name(subrel->srsubid, false)),
426  errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
427  get_rel_name(relid), subrel->srsubstate),
428 
429  /*
430  * translator: first %s is a SQL ALTER command and second %s is a
431  * SQL DROP command
432  */
433  errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
434  "ALTER SUBSCRIPTION ... ENABLE",
435  "DROP SUBSCRIPTION ...")));
436  }
437 
438  CatalogTupleDelete(rel, &tup->t_self);
439  }
440  table_endscan(scan);
441 
443 }
444 
445 /*
446  * Does the subscription have any relations?
447  *
448  * Use this function only to know true/false, and when you have no need for the
449  * List returned by GetSubscriptionRelations.
450  */
451 bool
453 {
454  Relation rel;
455  ScanKeyData skey[1];
456  SysScanDesc scan;
457  bool has_subrels;
458 
459  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
460 
461  ScanKeyInit(&skey[0],
462  Anum_pg_subscription_rel_srsubid,
463  BTEqualStrategyNumber, F_OIDEQ,
464  ObjectIdGetDatum(subid));
465 
466  scan = systable_beginscan(rel, InvalidOid, false,
467  NULL, 1, skey);
468 
469  /* If even a single tuple exists then the subscription has tables. */
470  has_subrels = HeapTupleIsValid(systable_getnext(scan));
471 
472  /* Cleanup */
473  systable_endscan(scan);
475 
476  return has_subrels;
477 }
478 
479 /*
480  * Get the relations for the subscription.
481  *
482  * If not_ready is true, return only the relations that are not in a ready
483  * state, otherwise return all the relations of the subscription. The
484  * returned list is palloc'ed in the current memory context.
485  */
486 List *
487 GetSubscriptionRelations(Oid subid, bool not_ready)
488 {
489  List *res = NIL;
490  Relation rel;
491  HeapTuple tup;
492  int nkeys = 0;
493  ScanKeyData skey[2];
494  SysScanDesc scan;
495 
496  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
497 
498  ScanKeyInit(&skey[nkeys++],
499  Anum_pg_subscription_rel_srsubid,
500  BTEqualStrategyNumber, F_OIDEQ,
501  ObjectIdGetDatum(subid));
502 
503  if (not_ready)
504  ScanKeyInit(&skey[nkeys++],
505  Anum_pg_subscription_rel_srsubstate,
506  BTEqualStrategyNumber, F_CHARNE,
507  CharGetDatum(SUBREL_STATE_READY));
508 
509  scan = systable_beginscan(rel, InvalidOid, false,
510  NULL, nkeys, skey);
511 
512  while (HeapTupleIsValid(tup = systable_getnext(scan)))
513  {
515  SubscriptionRelState *relstate;
516  Datum d;
517  bool isnull;
518 
519  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
520 
521  relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
522  relstate->relid = subrel->srrelid;
523  relstate->state = subrel->srsubstate;
525  Anum_pg_subscription_rel_srsublsn, &isnull);
526  if (isnull)
527  relstate->lsn = InvalidXLogRecPtr;
528  else
529  relstate->lsn = DatumGetLSN(d);
530 
531  res = lappend(res, relstate);
532  }
533 
534  /* Cleanup */
535  systable_endscan(scan);
537 
538  return res;
539 }
#define DatumGetArrayTypeP(X)
Definition: array.h:254
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3644
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define TextDatumGetCString(d)
Definition: builtins.h:95
#define NameStr(name)
Definition: c.h:735
#define OidIsValid(objectId)
Definition: c.h:764
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:599
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:506
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1086
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1108
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1201
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1426
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
int i
Definition: isn.c:73
List * lappend(List *list, void *datum)
Definition: list.c:338
void list_free_deep(List *list)
Definition: list.c:1559
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1932
char * get_subscription_name(Oid subid, bool missing_ok)
Definition: lsyscache.c:3677
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void pfree(void *pointer)
Definition: mcxt.c:1456
void * palloc(Size size)
Definition: mcxt.c:1226
#define NIL
Definition: pg_list.h:68
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
int CountDBSubscriptions(Oid dbid)
void FreeSubscription(Subscription *sub)
void DisableSubscription(Oid subid)
void RemoveSubscriptionRel(Oid subid, Oid relid)
List * GetSubscriptionRelations(Oid subid, bool not_ready)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
Subscription * GetSubscription(Oid subid, bool missing_ok)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
static List * textarray_to_stringlist(ArrayType *textarray)
bool HasSubscriptionRelations(Oid subid)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
FormData_pg_subscription * Form_pg_subscription
FormData_pg_subscription_rel * Form_pg_subscription_rel
static Name DatumGetName(Datum X)
Definition: postgres.h:360
uintptr_t Datum
Definition: postgres.h:64
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetDescr(relation)
Definition: rel.h:530
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
@ ForwardScanDirection
Definition: sdir.h:28
#define BTEqualStrategyNumber
Definition: stratnum.h:31
ItemPointerData t_self
Definition: htup.h:65
Definition: pg_list.h:54
XLogRecPtr skiplsn
Definition: regguts.h:323
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:868
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:820
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1081
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:831
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:1112
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:182
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:184
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
@ SUBSCRIPTIONOID
Definition: syscache.h:99
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
Definition: tableam.c:112
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1009
String * makeString(char *str)
Definition: value.c:63
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28