PostgreSQL Source Code  git master
pg_subscription.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "storage/lmgr.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
Include dependency graph for pg_subscription.c:

Go to the source code of this file.

Functions

static Listtextarray_to_stringlist (ArrayType *textarray)
 
SubscriptionGetSubscription (Oid subid, bool missing_ok)
 
int CountDBSubscriptions (Oid dbid)
 
void FreeSubscription (Subscription *sub)
 
void DisableSubscription (Oid subid)
 
void AddSubscriptionRelState (Oid subid, Oid relid, char state, XLogRecPtr sublsn)
 
void UpdateSubscriptionRelState (Oid subid, Oid relid, char state, XLogRecPtr sublsn)
 
char GetSubscriptionRelState (Oid subid, Oid relid, XLogRecPtr *sublsn)
 
void RemoveSubscriptionRel (Oid subid, Oid relid)
 
bool HasSubscriptionRelations (Oid subid)
 
ListGetSubscriptionRelations (Oid subid, bool not_ready)
 

Function Documentation

◆ AddSubscriptionRelState()

void AddSubscriptionRelState ( Oid  subid,
Oid  relid,
char  state,
XLogRecPtr  sublsn 
)

Definition at line 236 of file pg_subscription.c.

238 {
239  Relation rel;
240  HeapTuple tup;
241  bool nulls[Natts_pg_subscription_rel];
242  Datum values[Natts_pg_subscription_rel];
243 
244  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
245 
246  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
247 
248  /* Try finding existing mapping. */
250  ObjectIdGetDatum(relid),
251  ObjectIdGetDatum(subid));
252  if (HeapTupleIsValid(tup))
253  elog(ERROR, "subscription table %u in subscription %u already exists",
254  relid, subid);
255 
256  /* Form the tuple. */
257  memset(values, 0, sizeof(values));
258  memset(nulls, false, sizeof(nulls));
259  values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
260  values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
261  values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
262  if (sublsn != InvalidXLogRecPtr)
263  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
264  else
265  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
266 
267  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
268 
269  /* Insert tuple into catalog. */
270  CatalogTupleInsert(rel, tup);
271 
272  heap_freetuple(tup);
273 
274  /* Cleanup. */
275  table_close(rel, NoLock);
276 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define ERROR
Definition: elog.h:39
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define RelationGetDescr(relation)
Definition: rel.h:529
Definition: regguts.h:318
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:181
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, CatalogTupleInsert(), CharGetDatum(), elog(), ERROR, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InvalidXLogRecPtr, LockSharedObject(), LSNGetDatum(), NoLock, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONRELMAP, table_close(), table_open(), and values.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

◆ CountDBSubscriptions()

int CountDBSubscriptions ( Oid  dbid)

Definition at line 127 of file pg_subscription.c.

128 {
129  int nsubs = 0;
130  Relation rel;
131  ScanKeyData scankey;
132  SysScanDesc scan;
133  HeapTuple tup;
134 
135  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
136 
137  ScanKeyInit(&scankey,
138  Anum_pg_subscription_subdbid,
139  BTEqualStrategyNumber, F_OIDEQ,
140  ObjectIdGetDatum(dbid));
141 
142  scan = systable_beginscan(rel, InvalidOid, false,
143  NULL, 1, &scankey);
144 
145  while (HeapTupleIsValid(tup = systable_getnext(scan)))
146  nsubs++;
147 
148  systable_endscan(scan);
149 
150  table_close(rel, NoLock);
151 
152  return nsubs;
153 }
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:599
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:506
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
#define InvalidOid
Definition: postgres_ext.h:36
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define BTEqualStrategyNumber
Definition: stratnum.h:31

References BTEqualStrategyNumber, HeapTupleIsValid, InvalidOid, NoLock, ObjectIdGetDatum(), RowExclusiveLock, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), and table_open().

Referenced by dropdb().

◆ DisableSubscription()

void DisableSubscription ( Oid  subid)

Definition at line 173 of file pg_subscription.c.

174 {
175  Relation rel;
176  bool nulls[Natts_pg_subscription];
177  bool replaces[Natts_pg_subscription];
178  Datum values[Natts_pg_subscription];
179  HeapTuple tup;
180 
181  /* Look up the subscription in the catalog */
182  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
184 
185  if (!HeapTupleIsValid(tup))
186  elog(ERROR, "cache lookup failed for subscription %u", subid);
187 
188  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
189 
190  /* Form a new tuple. */
191  memset(values, 0, sizeof(values));
192  memset(nulls, false, sizeof(nulls));
193  memset(replaces, false, sizeof(replaces));
194 
195  /* Set the subscription to disabled. */
196  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
197  replaces[Anum_pg_subscription_subenabled - 1] = true;
198 
199  /* Update the catalog */
200  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
201  replaces);
202  CatalogTupleUpdate(rel, &tup->t_self, tup);
203  heap_freetuple(tup);
204 
205  table_close(rel, NoLock);
206 }
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:179
@ SUBSCRIPTIONOID
Definition: syscache.h:99

References AccessShareLock, BoolGetDatum(), CatalogTupleUpdate(), elog(), ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LockSharedObject(), NoLock, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by DisableSubscriptionAndExit().

◆ FreeSubscription()

void FreeSubscription ( Subscription sub)

Definition at line 159 of file pg_subscription.c.

160 {
161  pfree(sub->name);
162  pfree(sub->conninfo);
163  if (sub->slotname)
164  pfree(sub->slotname);
166  pfree(sub);
167 }
void list_free_deep(List *list)
Definition: list.c:1559
void pfree(void *pointer)
Definition: mcxt.c:1436

References Subscription::conninfo, list_free_deep(), Subscription::name, pfree(), Subscription::publications, and Subscription::slotname.

Referenced by maybe_reread_subscription().

◆ GetSubscription()

Subscription* GetSubscription ( Oid  subid,
bool  missing_ok 
)

Definition at line 43 of file pg_subscription.c.

44 {
45  HeapTuple tup;
46  Subscription *sub;
47  Form_pg_subscription subform;
48  Datum datum;
49  bool isnull;
50 
52 
53  if (!HeapTupleIsValid(tup))
54  {
55  if (missing_ok)
56  return NULL;
57 
58  elog(ERROR, "cache lookup failed for subscription %u", subid);
59  }
60 
61  subform = (Form_pg_subscription) GETSTRUCT(tup);
62 
63  sub = (Subscription *) palloc(sizeof(Subscription));
64  sub->oid = subid;
65  sub->dbid = subform->subdbid;
66  sub->skiplsn = subform->subskiplsn;
67  sub->name = pstrdup(NameStr(subform->subname));
68  sub->owner = subform->subowner;
69  sub->enabled = subform->subenabled;
70  sub->binary = subform->subbinary;
71  sub->stream = subform->substream;
72  sub->twophasestate = subform->subtwophasestate;
73  sub->disableonerr = subform->subdisableonerr;
74 
75  /* Get conninfo */
77  tup,
78  Anum_pg_subscription_subconninfo,
79  &isnull);
80  Assert(!isnull);
81  sub->conninfo = TextDatumGetCString(datum);
82 
83  /* Get slotname */
85  tup,
86  Anum_pg_subscription_subslotname,
87  &isnull);
88  if (!isnull)
89  sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90  else
91  sub->slotname = NULL;
92 
93  /* Get synccommit */
95  tup,
96  Anum_pg_subscription_subsynccommit,
97  &isnull);
98  Assert(!isnull);
99  sub->synccommit = TextDatumGetCString(datum);
100 
101  /* Get publications */
103  tup,
104  Anum_pg_subscription_subpublications,
105  &isnull);
106  Assert(!isnull);
108 
109  /* Get origin */
111  tup,
112  Anum_pg_subscription_suborigin,
113  &isnull);
114  Assert(!isnull);
115  sub->origin = TextDatumGetCString(datum);
116 
117  ReleaseSysCache(tup);
118 
119  return sub;
120 }
#define DatumGetArrayTypeP(X)
Definition: array.h:254
#define TextDatumGetCString(d)
Definition: builtins.h:95
#define NameStr(name)
Definition: c.h:730
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
Assert(fmt[strlen(fmt) - 1] !='\n')
char * pstrdup(const char *in)
Definition: mcxt.c:1624
void * palloc(Size size)
Definition: mcxt.c:1210
static List * textarray_to_stringlist(ArrayType *textarray)
FormData_pg_subscription * Form_pg_subscription
static Name DatumGetName(Datum X)
Definition: postgres.h:360
XLogRecPtr skiplsn
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:865
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:817
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1078

References Assert(), Subscription::binary, Subscription::conninfo, DatumGetArrayTypeP, DatumGetName(), Subscription::dbid, Subscription::disableonerr, elog(), Subscription::enabled, ERROR, GETSTRUCT, HeapTupleIsValid, Subscription::name, NameStr, ObjectIdGetDatum(), Subscription::oid, Subscription::origin, Subscription::owner, palloc(), pstrdup(), Subscription::publications, ReleaseSysCache(), SearchSysCache1(), Subscription::skiplsn, Subscription::slotname, Subscription::stream, SUBSCRIPTIONOID, Subscription::synccommit, SysCacheGetAttr(), textarray_to_stringlist(), TextDatumGetCString, and Subscription::twophasestate.

Referenced by AlterSubscription(), InitializeApplyWorker(), and maybe_reread_subscription().

◆ GetSubscriptionRelations()

List* GetSubscriptionRelations ( Oid  subid,
bool  not_ready 
)

Definition at line 493 of file pg_subscription.c.

494 {
495  List *res = NIL;
496  Relation rel;
497  HeapTuple tup;
498  int nkeys = 0;
499  ScanKeyData skey[2];
500  SysScanDesc scan;
501 
502  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
503 
504  ScanKeyInit(&skey[nkeys++],
505  Anum_pg_subscription_rel_srsubid,
506  BTEqualStrategyNumber, F_OIDEQ,
507  ObjectIdGetDatum(subid));
508 
509  if (not_ready)
510  ScanKeyInit(&skey[nkeys++],
511  Anum_pg_subscription_rel_srsubstate,
512  BTEqualStrategyNumber, F_CHARNE,
513  CharGetDatum(SUBREL_STATE_READY));
514 
515  scan = systable_beginscan(rel, InvalidOid, false,
516  NULL, nkeys, skey);
517 
518  while (HeapTupleIsValid(tup = systable_getnext(scan)))
519  {
521  SubscriptionRelState *relstate;
522  Datum d;
523  bool isnull;
524 
525  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
526 
527  relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
528  relstate->relid = subrel->srrelid;
529  relstate->state = subrel->srsubstate;
531  Anum_pg_subscription_rel_srsublsn, &isnull);
532  if (isnull)
533  relstate->lsn = InvalidXLogRecPtr;
534  else
535  relstate->lsn = DatumGetLSN(d);
536 
537  res = lappend(res, relstate);
538  }
539 
540  /* Cleanup */
541  systable_endscan(scan);
543 
544  return res;
545 }
List * lappend(List *list, void *datum)
Definition: list.c:338
#define NIL
Definition: pg_list.h:68
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
FormData_pg_subscription_rel * Form_pg_subscription_rel
Definition: pg_list.h:54

References AccessShareLock, BTEqualStrategyNumber, CharGetDatum(), DatumGetLSN(), GETSTRUCT, HeapTupleIsValid, InvalidOid, InvalidXLogRecPtr, lappend(), SubscriptionRelState::lsn, NIL, ObjectIdGetDatum(), palloc(), SubscriptionRelState::relid, res, ScanKeyInit(), SubscriptionRelState::state, SUBSCRIPTIONRELMAP, SysCacheGetAttr(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), and table_open().

Referenced by AlterSubscription_refresh(), DropSubscription(), and FetchTableStates().

◆ GetSubscriptionRelState()

char GetSubscriptionRelState ( Oid  subid,
Oid  relid,
XLogRecPtr sublsn 
)

Definition at line 333 of file pg_subscription.c.

334 {
335  HeapTuple tup;
336  char substate;
337  bool isnull;
338  Datum d;
339  Relation rel;
340 
341  /*
342  * This is to avoid the race condition with AlterSubscription which tries
343  * to remove this relstate.
344  */
345  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
346 
347  /* Try finding the mapping. */
349  ObjectIdGetDatum(relid),
350  ObjectIdGetDatum(subid));
351 
352  if (!HeapTupleIsValid(tup))
353  {
355  *sublsn = InvalidXLogRecPtr;
356  return SUBREL_STATE_UNKNOWN;
357  }
358 
359  /* Get the state. */
360  substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
361 
362  /* Get the LSN */
364  Anum_pg_subscription_rel_srsublsn, &isnull);
365  if (isnull)
366  *sublsn = InvalidXLogRecPtr;
367  else
368  *sublsn = DatumGetLSN(d);
369 
370  /* Cleanup */
371  ReleaseSysCache(tup);
372 
374 
375  return substate;
376 }
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:828

References AccessShareLock, DatumGetLSN(), GETSTRUCT, HeapTupleIsValid, InvalidXLogRecPtr, ObjectIdGetDatum(), ReleaseSysCache(), SearchSysCache2(), SUBSCRIPTIONRELMAP, SysCacheGetAttr(), table_close(), and table_open().

Referenced by AlterSubscription_refresh(), logicalrep_rel_open(), LogicalRepSyncTableStart(), and wait_for_relation_state_change().

◆ HasSubscriptionRelations()

bool HasSubscriptionRelations ( Oid  subid)

Definition at line 458 of file pg_subscription.c.

459 {
460  Relation rel;
461  ScanKeyData skey[1];
462  SysScanDesc scan;
463  bool has_subrels;
464 
465  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
466 
467  ScanKeyInit(&skey[0],
468  Anum_pg_subscription_rel_srsubid,
469  BTEqualStrategyNumber, F_OIDEQ,
470  ObjectIdGetDatum(subid));
471 
472  scan = systable_beginscan(rel, InvalidOid, false,
473  NULL, 1, skey);
474 
475  /* If even a single tuple exists then the subscription has tables. */
476  has_subrels = HeapTupleIsValid(systable_getnext(scan));
477 
478  /* Cleanup */
479  systable_endscan(scan);
481 
482  return has_subrels;
483 }

References AccessShareLock, BTEqualStrategyNumber, HeapTupleIsValid, InvalidOid, ObjectIdGetDatum(), ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), and table_open().

Referenced by FetchTableStates().

◆ RemoveSubscriptionRel()

void RemoveSubscriptionRel ( Oid  subid,
Oid  relid 
)

Definition at line 383 of file pg_subscription.c.

384 {
385  Relation rel;
386  TableScanDesc scan;
387  ScanKeyData skey[2];
388  HeapTuple tup;
389  int nkeys = 0;
390 
391  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
392 
393  if (OidIsValid(subid))
394  {
395  ScanKeyInit(&skey[nkeys++],
396  Anum_pg_subscription_rel_srsubid,
398  F_OIDEQ,
399  ObjectIdGetDatum(subid));
400  }
401 
402  if (OidIsValid(relid))
403  {
404  ScanKeyInit(&skey[nkeys++],
405  Anum_pg_subscription_rel_srrelid,
407  F_OIDEQ,
408  ObjectIdGetDatum(relid));
409  }
410 
411  /* Do the search and delete what we found. */
412  scan = table_beginscan_catalog(rel, nkeys, skey);
414  {
416 
417  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
418 
419  /*
420  * We don't allow to drop the relation mapping when the table
421  * synchronization is in progress unless the caller updates the
422  * corresponding subscription as well. This is to ensure that we don't
423  * leave tablesync slots or origins in the system when the
424  * corresponding table is dropped.
425  */
426  if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
427  {
428  ereport(ERROR,
429  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
430  errmsg("could not drop relation mapping for subscription \"%s\"",
431  get_subscription_name(subrel->srsubid, false)),
432  errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
433  get_rel_name(relid), subrel->srsubstate),
434 
435  /*
436  * translator: first %s is a SQL ALTER command and second %s is a
437  * SQL DROP command
438  */
439  errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
440  "ALTER SUBSCRIPTION ... ENABLE",
441  "DROP SUBSCRIPTION ...")));
442  }
443 
444  CatalogTupleDelete(rel, &tup->t_self);
445  }
446  table_endscan(scan);
447 
449 }
#define OidIsValid(objectId)
Definition: c.h:759
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ereport(elevel,...)
Definition: elog.h:149
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1093
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1910
char * get_subscription_name(Oid subid, bool missing_ok)
Definition: lsyscache.c:3664
@ ForwardScanDirection
Definition: sdir.h:28
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
Definition: tableam.c:112
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1011

References BTEqualStrategyNumber, CatalogTupleDelete(), ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, ForwardScanDirection, get_rel_name(), get_subscription_name(), GETSTRUCT, heap_getnext(), HeapTupleIsValid, ObjectIdGetDatum(), OidIsValid, RowExclusiveLock, ScanKeyInit(), HeapTupleData::t_self, table_beginscan_catalog(), table_close(), table_endscan(), and table_open().

Referenced by AlterSubscription_refresh(), DropSubscription(), and heap_drop_with_catalog().

◆ textarray_to_stringlist()

static List * textarray_to_stringlist ( ArrayType textarray)
static

Definition at line 214 of file pg_subscription.c.

215 {
216  Datum *elems;
217  int nelems,
218  i;
219  List *res = NIL;
220 
221  deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
222 
223  if (nelems == 0)
224  return NIL;
225 
226  for (i = 0; i < nelems; i++)
228 
229  return res;
230 }
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3668
int i
Definition: isn.c:73
String * makeString(char *str)
Definition: value.c:63

References deconstruct_array_builtin(), i, lappend(), makeString(), NIL, res, and TextDatumGetCString.

Referenced by GetSubscription().

◆ UpdateSubscriptionRelState()

void UpdateSubscriptionRelState ( Oid  subid,
Oid  relid,
char  state,
XLogRecPtr  sublsn 
)

Definition at line 282 of file pg_subscription.c.

284 {
285  Relation rel;
286  HeapTuple tup;
287  bool nulls[Natts_pg_subscription_rel];
288  Datum values[Natts_pg_subscription_rel];
289  bool replaces[Natts_pg_subscription_rel];
290 
291  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
292 
293  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
294 
295  /* Try finding existing mapping. */
297  ObjectIdGetDatum(relid),
298  ObjectIdGetDatum(subid));
299  if (!HeapTupleIsValid(tup))
300  elog(ERROR, "subscription table %u in subscription %u does not exist",
301  relid, subid);
302 
303  /* Update the tuple. */
304  memset(values, 0, sizeof(values));
305  memset(nulls, false, sizeof(nulls));
306  memset(replaces, false, sizeof(replaces));
307 
308  replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
309  values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
310 
311  replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
312  if (sublsn != InvalidXLogRecPtr)
313  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
314  else
315  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
316 
317  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
318  replaces);
319 
320  /* Update the catalog. */
321  CatalogTupleUpdate(rel, &tup->t_self, tup);
322 
323  /* Cleanup. */
324  table_close(rel, NoLock);
325 }

References AccessShareLock, CatalogTupleUpdate(), CharGetDatum(), elog(), ERROR, heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, LockSharedObject(), LSNGetDatum(), NoLock, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONRELMAP, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by LogicalRepSyncTableStart(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().