PostgreSQL Source Code  git master
pg_subscription.c File Reference
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "storage/lmgr.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
Include dependency graph for pg_subscription.c:

Go to the source code of this file.

Functions

static Listtextarray_to_stringlist (ArrayType *textarray)
 
SubscriptionGetSubscription (Oid subid, bool missing_ok)
 
int CountDBSubscriptions (Oid dbid)
 
void FreeSubscription (Subscription *sub)
 
void DisableSubscription (Oid subid)
 
void AddSubscriptionRelState (Oid subid, Oid relid, char state, XLogRecPtr sublsn)
 
void UpdateSubscriptionRelState (Oid subid, Oid relid, char state, XLogRecPtr sublsn)
 
char GetSubscriptionRelState (Oid subid, Oid relid, XLogRecPtr *sublsn)
 
void RemoveSubscriptionRel (Oid subid, Oid relid)
 
bool HasSubscriptionRelations (Oid subid)
 
ListGetSubscriptionRelations (Oid subid, bool not_ready)
 

Function Documentation

◆ AddSubscriptionRelState()

void AddSubscriptionRelState ( Oid  subid,
Oid  relid,
char  state,
XLogRecPtr  sublsn 
)

Definition at line 233 of file pg_subscription.c.

235 {
236  Relation rel;
237  HeapTuple tup;
238  bool nulls[Natts_pg_subscription_rel];
239  Datum values[Natts_pg_subscription_rel];
240 
241  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
242 
243  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
244 
245  /* Try finding existing mapping. */
247  ObjectIdGetDatum(relid),
248  ObjectIdGetDatum(subid));
249  if (HeapTupleIsValid(tup))
250  elog(ERROR, "subscription table %u in subscription %u already exists",
251  relid, subid);
252 
253  /* Form the tuple. */
254  memset(values, 0, sizeof(values));
255  memset(nulls, false, sizeof(nulls));
256  values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
257  values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
258  values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
259  if (sublsn != InvalidXLogRecPtr)
260  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
261  else
262  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
263 
264  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
265 
266  /* Insert tuple into catalog. */
267  CatalogTupleInsert(rel, tup);
268 
269  heap_freetuple(tup);
270 
271  /* Cleanup. */
272  table_close(rel, NoLock);
273 }
static Datum values[MAXATTR]
Definition: bootstrap.c:156
#define ERROR
Definition: elog.h:39
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:233
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1046
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum CharGetDatum(char X)
Definition: postgres.h:122
#define RelationGetDescr(relation)
Definition: rel.h:530
Definition: regguts.h:323
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:184
@ SUBSCRIPTIONRELMAP
Definition: syscache.h:100
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References AccessShareLock, CatalogTupleInsert(), CharGetDatum(), elog(), ERROR, heap_form_tuple(), heap_freetuple(), HeapTupleIsValid, InvalidXLogRecPtr, LockSharedObject(), LSNGetDatum(), NoLock, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONRELMAP, table_close(), table_open(), and values.

Referenced by AlterSubscription_refresh(), and CreateSubscription().

◆ CountDBSubscriptions()

int CountDBSubscriptions ( Oid  dbid)

Definition at line 124 of file pg_subscription.c.

125 {
126  int nsubs = 0;
127  Relation rel;
128  ScanKeyData scankey;
129  SysScanDesc scan;
130  HeapTuple tup;
131 
132  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
133 
134  ScanKeyInit(&scankey,
135  Anum_pg_subscription_subdbid,
136  BTEqualStrategyNumber, F_OIDEQ,
137  ObjectIdGetDatum(dbid));
138 
139  scan = systable_beginscan(rel, InvalidOid, false,
140  NULL, 1, &scankey);
141 
142  while (HeapTupleIsValid(tup = systable_getnext(scan)))
143  nsubs++;
144 
145  systable_endscan(scan);
146 
147  table_close(rel, NoLock);
148 
149  return nsubs;
150 }
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:599
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:506
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
#define InvalidOid
Definition: postgres_ext.h:36
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define BTEqualStrategyNumber
Definition: stratnum.h:31

References BTEqualStrategyNumber, HeapTupleIsValid, InvalidOid, NoLock, ObjectIdGetDatum(), RowExclusiveLock, ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), and table_open().

Referenced by dropdb().

◆ DisableSubscription()

void DisableSubscription ( Oid  subid)

Definition at line 170 of file pg_subscription.c.

171 {
172  Relation rel;
173  bool nulls[Natts_pg_subscription];
174  bool replaces[Natts_pg_subscription];
175  Datum values[Natts_pg_subscription];
176  HeapTuple tup;
177 
178  /* Look up the subscription in the catalog */
179  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
181 
182  if (!HeapTupleIsValid(tup))
183  elog(ERROR, "cache lookup failed for subscription %u", subid);
184 
185  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
186 
187  /* Form a new tuple. */
188  memset(values, 0, sizeof(values));
189  memset(nulls, false, sizeof(nulls));
190  memset(replaces, false, sizeof(replaces));
191 
192  /* Set the subscription to disabled. */
193  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
194  replaces[Anum_pg_subscription_subenabled - 1] = true;
195 
196  /* Update the catalog */
197  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
198  replaces);
199  CatalogTupleUpdate(rel, &tup->t_self, tup);
200  heap_freetuple(tup);
201 
202  table_close(rel, NoLock);
203 }
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:313
static Datum BoolGetDatum(bool X)
Definition: postgres.h:102
ItemPointerData t_self
Definition: htup.h:65
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:182
@ SUBSCRIPTIONOID
Definition: syscache.h:99

References AccessShareLock, BoolGetDatum(), CatalogTupleUpdate(), elog(), ERROR, heap_freetuple(), heap_modify_tuple(), HeapTupleIsValid, LockSharedObject(), NoLock, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy1, SUBSCRIPTIONOID, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by DisableSubscriptionAndExit().

◆ FreeSubscription()

void FreeSubscription ( Subscription sub)

Definition at line 156 of file pg_subscription.c.

157 {
158  pfree(sub->name);
159  pfree(sub->conninfo);
160  if (sub->slotname)
161  pfree(sub->slotname);
163  pfree(sub);
164 }
void list_free_deep(List *list)
Definition: list.c:1559
void pfree(void *pointer)
Definition: mcxt.c:1456

References Subscription::conninfo, list_free_deep(), Subscription::name, pfree(), Subscription::publications, and Subscription::slotname.

Referenced by maybe_reread_subscription().

◆ GetSubscription()

Subscription* GetSubscription ( Oid  subid,
bool  missing_ok 
)

Definition at line 43 of file pg_subscription.c.

44 {
45  HeapTuple tup;
46  Subscription *sub;
47  Form_pg_subscription subform;
48  Datum datum;
49  bool isnull;
50 
52 
53  if (!HeapTupleIsValid(tup))
54  {
55  if (missing_ok)
56  return NULL;
57 
58  elog(ERROR, "cache lookup failed for subscription %u", subid);
59  }
60 
61  subform = (Form_pg_subscription) GETSTRUCT(tup);
62 
63  sub = (Subscription *) palloc(sizeof(Subscription));
64  sub->oid = subid;
65  sub->dbid = subform->subdbid;
66  sub->skiplsn = subform->subskiplsn;
67  sub->name = pstrdup(NameStr(subform->subname));
68  sub->owner = subform->subowner;
69  sub->enabled = subform->subenabled;
70  sub->binary = subform->subbinary;
71  sub->stream = subform->substream;
72  sub->twophasestate = subform->subtwophasestate;
73  sub->disableonerr = subform->subdisableonerr;
74  sub->passwordrequired = subform->subpasswordrequired;
75  sub->runasowner = subform->subrunasowner;
76 
77  /* Get conninfo */
79  tup,
80  Anum_pg_subscription_subconninfo);
81  sub->conninfo = TextDatumGetCString(datum);
82 
83  /* Get slotname */
85  tup,
86  Anum_pg_subscription_subslotname,
87  &isnull);
88  if (!isnull)
89  sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
90  else
91  sub->slotname = NULL;
92 
93  /* Get synccommit */
95  tup,
96  Anum_pg_subscription_subsynccommit);
97  sub->synccommit = TextDatumGetCString(datum);
98 
99  /* Get publications */
101  tup,
102  Anum_pg_subscription_subpublications);
104 
105  /* Get origin */
107  tup,
108  Anum_pg_subscription_suborigin);
109  sub->origin = TextDatumGetCString(datum);
110 
111  /* Is the subscription owner a superuser? */
112  sub->ownersuperuser = superuser_arg(sub->owner);
113 
114  ReleaseSysCache(tup);
115 
116  return sub;
117 }
#define DatumGetArrayTypeP(X)
Definition: array.h:261
#define TextDatumGetCString(d)
Definition: builtins.h:95
#define NameStr(name)
Definition: c.h:735
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
char * pstrdup(const char *in)
Definition: mcxt.c:1644
void * palloc(Size size)
Definition: mcxt.c:1226
static List * textarray_to_stringlist(ArrayType *textarray)
FormData_pg_subscription * Form_pg_subscription
static Name DatumGetName(Datum X)
Definition: postgres.h:360
XLogRecPtr skiplsn
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:868
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:820
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1081
Datum SysCacheGetAttrNotNull(int cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition: syscache.c:1112

References Subscription::binary, Subscription::conninfo, DatumGetArrayTypeP, DatumGetName(), Subscription::dbid, Subscription::disableonerr, elog(), Subscription::enabled, ERROR, GETSTRUCT, HeapTupleIsValid, Subscription::name, NameStr, ObjectIdGetDatum(), Subscription::oid, Subscription::origin, Subscription::owner, Subscription::ownersuperuser, palloc(), Subscription::passwordrequired, pstrdup(), Subscription::publications, ReleaseSysCache(), Subscription::runasowner, SearchSysCache1(), Subscription::skiplsn, Subscription::slotname, Subscription::stream, SUBSCRIPTIONOID, superuser_arg(), Subscription::synccommit, SysCacheGetAttr(), SysCacheGetAttrNotNull(), textarray_to_stringlist(), TextDatumGetCString, and Subscription::twophasestate.

Referenced by AlterSubscription(), InitializeLogRepWorker(), and maybe_reread_subscription().

◆ GetSubscriptionRelations()

List* GetSubscriptionRelations ( Oid  subid,
bool  not_ready 
)

Definition at line 490 of file pg_subscription.c.

491 {
492  List *res = NIL;
493  Relation rel;
494  HeapTuple tup;
495  int nkeys = 0;
496  ScanKeyData skey[2];
497  SysScanDesc scan;
498 
499  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
500 
501  ScanKeyInit(&skey[nkeys++],
502  Anum_pg_subscription_rel_srsubid,
503  BTEqualStrategyNumber, F_OIDEQ,
504  ObjectIdGetDatum(subid));
505 
506  if (not_ready)
507  ScanKeyInit(&skey[nkeys++],
508  Anum_pg_subscription_rel_srsubstate,
509  BTEqualStrategyNumber, F_CHARNE,
510  CharGetDatum(SUBREL_STATE_READY));
511 
512  scan = systable_beginscan(rel, InvalidOid, false,
513  NULL, nkeys, skey);
514 
515  while (HeapTupleIsValid(tup = systable_getnext(scan)))
516  {
518  SubscriptionRelState *relstate;
519  Datum d;
520  bool isnull;
521 
522  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
523 
524  relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
525  relstate->relid = subrel->srrelid;
526  relstate->state = subrel->srsubstate;
528  Anum_pg_subscription_rel_srsublsn, &isnull);
529  if (isnull)
530  relstate->lsn = InvalidXLogRecPtr;
531  else
532  relstate->lsn = DatumGetLSN(d);
533 
534  res = lappend(res, relstate);
535  }
536 
537  /* Cleanup */
538  systable_endscan(scan);
540 
541  return res;
542 }
List * lappend(List *list, void *datum)
Definition: list.c:338
#define NIL
Definition: pg_list.h:68
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:22
FormData_pg_subscription_rel * Form_pg_subscription_rel
Definition: pg_list.h:54

References AccessShareLock, BTEqualStrategyNumber, CharGetDatum(), DatumGetLSN(), GETSTRUCT, HeapTupleIsValid, InvalidOid, InvalidXLogRecPtr, lappend(), SubscriptionRelState::lsn, NIL, ObjectIdGetDatum(), palloc(), SubscriptionRelState::relid, res, ScanKeyInit(), SubscriptionRelState::state, SUBSCRIPTIONRELMAP, SysCacheGetAttr(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), and table_open().

Referenced by AlterSubscription_refresh(), DropSubscription(), and FetchTableStates().

◆ GetSubscriptionRelState()

char GetSubscriptionRelState ( Oid  subid,
Oid  relid,
XLogRecPtr sublsn 
)

Definition at line 330 of file pg_subscription.c.

331 {
332  HeapTuple tup;
333  char substate;
334  bool isnull;
335  Datum d;
336  Relation rel;
337 
338  /*
339  * This is to avoid the race condition with AlterSubscription which tries
340  * to remove this relstate.
341  */
342  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
343 
344  /* Try finding the mapping. */
346  ObjectIdGetDatum(relid),
347  ObjectIdGetDatum(subid));
348 
349  if (!HeapTupleIsValid(tup))
350  {
352  *sublsn = InvalidXLogRecPtr;
353  return SUBREL_STATE_UNKNOWN;
354  }
355 
356  /* Get the state. */
357  substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
358 
359  /* Get the LSN */
361  Anum_pg_subscription_rel_srsublsn, &isnull);
362  if (isnull)
363  *sublsn = InvalidXLogRecPtr;
364  else
365  *sublsn = DatumGetLSN(d);
366 
367  /* Cleanup */
368  ReleaseSysCache(tup);
369 
371 
372  return substate;
373 }
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:831

References AccessShareLock, DatumGetLSN(), GETSTRUCT, HeapTupleIsValid, InvalidXLogRecPtr, ObjectIdGetDatum(), ReleaseSysCache(), SearchSysCache2(), SUBSCRIPTIONRELMAP, SysCacheGetAttr(), table_close(), and table_open().

Referenced by AlterSubscription_refresh(), logicalrep_rel_open(), LogicalRepSyncTableStart(), and wait_for_relation_state_change().

◆ HasSubscriptionRelations()

bool HasSubscriptionRelations ( Oid  subid)

Definition at line 455 of file pg_subscription.c.

456 {
457  Relation rel;
458  ScanKeyData skey[1];
459  SysScanDesc scan;
460  bool has_subrels;
461 
462  rel = table_open(SubscriptionRelRelationId, AccessShareLock);
463 
464  ScanKeyInit(&skey[0],
465  Anum_pg_subscription_rel_srsubid,
466  BTEqualStrategyNumber, F_OIDEQ,
467  ObjectIdGetDatum(subid));
468 
469  scan = systable_beginscan(rel, InvalidOid, false,
470  NULL, 1, skey);
471 
472  /* If even a single tuple exists then the subscription has tables. */
473  has_subrels = HeapTupleIsValid(systable_getnext(scan));
474 
475  /* Cleanup */
476  systable_endscan(scan);
478 
479  return has_subrels;
480 }

References AccessShareLock, BTEqualStrategyNumber, HeapTupleIsValid, InvalidOid, ObjectIdGetDatum(), ScanKeyInit(), systable_beginscan(), systable_endscan(), systable_getnext(), table_close(), and table_open().

Referenced by FetchTableStates().

◆ RemoveSubscriptionRel()

void RemoveSubscriptionRel ( Oid  subid,
Oid  relid 
)

Definition at line 380 of file pg_subscription.c.

381 {
382  Relation rel;
383  TableScanDesc scan;
384  ScanKeyData skey[2];
385  HeapTuple tup;
386  int nkeys = 0;
387 
388  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
389 
390  if (OidIsValid(subid))
391  {
392  ScanKeyInit(&skey[nkeys++],
393  Anum_pg_subscription_rel_srsubid,
395  F_OIDEQ,
396  ObjectIdGetDatum(subid));
397  }
398 
399  if (OidIsValid(relid))
400  {
401  ScanKeyInit(&skey[nkeys++],
402  Anum_pg_subscription_rel_srrelid,
404  F_OIDEQ,
405  ObjectIdGetDatum(relid));
406  }
407 
408  /* Do the search and delete what we found. */
409  scan = table_beginscan_catalog(rel, nkeys, skey);
411  {
413 
414  subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
415 
416  /*
417  * We don't allow to drop the relation mapping when the table
418  * synchronization is in progress unless the caller updates the
419  * corresponding subscription as well. This is to ensure that we don't
420  * leave tablesync slots or origins in the system when the
421  * corresponding table is dropped.
422  */
423  if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
424  {
425  ereport(ERROR,
426  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
427  errmsg("could not drop relation mapping for subscription \"%s\"",
428  get_subscription_name(subrel->srsubid, false)),
429  errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
430  get_rel_name(relid), subrel->srsubstate),
431 
432  /*
433  * translator: first %s is a SQL ALTER command and second %s is a
434  * SQL DROP command
435  */
436  errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
437  "ALTER SUBSCRIPTION ... ENABLE",
438  "DROP SUBSCRIPTION ...")));
439  }
440 
441  CatalogTupleDelete(rel, &tup->t_self);
442  }
443  table_endscan(scan);
444 
446 }
#define OidIsValid(objectId)
Definition: c.h:764
int errdetail(const char *fmt,...)
Definition: elog.c:1202
int errhint(const char *fmt,...)
Definition: elog.c:1316
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ereport(elevel,...)
Definition: elog.h:149
HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction)
Definition: heapam.c:1086
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1932
char * get_subscription_name(Oid subid, bool missing_ok)
Definition: lsyscache.c:3677
@ ForwardScanDirection
Definition: sdir.h:28
TableScanDesc table_beginscan_catalog(Relation relation, int nkeys, struct ScanKeyData *key)
Definition: tableam.c:112
static void table_endscan(TableScanDesc scan)
Definition: tableam.h:1009

References BTEqualStrategyNumber, CatalogTupleDelete(), ereport, errcode(), errdetail(), errhint(), errmsg(), ERROR, ForwardScanDirection, get_rel_name(), get_subscription_name(), GETSTRUCT, heap_getnext(), HeapTupleIsValid, ObjectIdGetDatum(), OidIsValid, RowExclusiveLock, ScanKeyInit(), HeapTupleData::t_self, table_beginscan_catalog(), table_close(), table_endscan(), and table_open().

Referenced by AlterSubscription_refresh(), DropSubscription(), and heap_drop_with_catalog().

◆ textarray_to_stringlist()

static List * textarray_to_stringlist ( ArrayType textarray)
static

Definition at line 211 of file pg_subscription.c.

212 {
213  Datum *elems;
214  int nelems,
215  i;
216  List *res = NIL;
217 
218  deconstruct_array_builtin(textarray, TEXTOID, &elems, NULL, &nelems);
219 
220  if (nelems == 0)
221  return NIL;
222 
223  for (i = 0; i < nelems; i++)
225 
226  return res;
227 }
void deconstruct_array_builtin(ArrayType *array, Oid elmtype, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3679
int i
Definition: isn.c:73
String * makeString(char *str)
Definition: value.c:63

References deconstruct_array_builtin(), i, lappend(), makeString(), NIL, res, and TextDatumGetCString.

Referenced by GetSubscription().

◆ UpdateSubscriptionRelState()

void UpdateSubscriptionRelState ( Oid  subid,
Oid  relid,
char  state,
XLogRecPtr  sublsn 
)

Definition at line 279 of file pg_subscription.c.

281 {
282  Relation rel;
283  HeapTuple tup;
284  bool nulls[Natts_pg_subscription_rel];
285  Datum values[Natts_pg_subscription_rel];
286  bool replaces[Natts_pg_subscription_rel];
287 
288  LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
289 
290  rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
291 
292  /* Try finding existing mapping. */
294  ObjectIdGetDatum(relid),
295  ObjectIdGetDatum(subid));
296  if (!HeapTupleIsValid(tup))
297  elog(ERROR, "subscription table %u in subscription %u does not exist",
298  relid, subid);
299 
300  /* Update the tuple. */
301  memset(values, 0, sizeof(values));
302  memset(nulls, false, sizeof(nulls));
303  memset(replaces, false, sizeof(replaces));
304 
305  replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
306  values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
307 
308  replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
309  if (sublsn != InvalidXLogRecPtr)
310  values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
311  else
312  nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
313 
314  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
315  replaces);
316 
317  /* Update the catalog. */
318  CatalogTupleUpdate(rel, &tup->t_self, tup);
319 
320  /* Cleanup. */
321  table_close(rel, NoLock);
322 }

References AccessShareLock, CatalogTupleUpdate(), CharGetDatum(), elog(), ERROR, heap_modify_tuple(), HeapTupleIsValid, InvalidXLogRecPtr, LockSharedObject(), LSNGetDatum(), NoLock, ObjectIdGetDatum(), RelationGetDescr, RowExclusiveLock, SearchSysCacheCopy2, SUBSCRIPTIONRELMAP, HeapTupleData::t_self, table_close(), table_open(), and values.

Referenced by LogicalRepSyncTableStart(), process_syncing_tables_for_apply(), and process_syncing_tables_for_sync().