PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
subscriptioncmds.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * subscriptioncmds.c
4  * subscription catalog manipulation functions
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * subscriptioncmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
18 
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/xact.h"
22 
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/objectaccess.h"
27 #include "catalog/objectaddress.h"
28 #include "catalog/pg_type.h"
31 
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
35 
36 #include "nodes/makefuncs.h"
37 
39 #include "replication/origin.h"
41 #include "replication/walsender.h"
43 
44 #include "storage/lmgr.h"
45 
46 #include "utils/builtins.h"
47 #include "utils/lsyscache.h"
48 #include "utils/memutils.h"
49 #include "utils/syscache.h"
50 
51 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
52 
53 /*
54  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
55  *
56  * Since not all options can be specified in both commands, this function
57  * will report an error on options if the target output pointer is NULL to
58  * accomodate that.
59  */
60 static void
61 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
62  bool *enabled, bool *create_slot, char **slot_name,
63  bool *copy_data)
64 {
65  ListCell *lc;
66  bool connect_given = false;
67  bool create_slot_given = false;
68  bool copy_data_given = false;
69 
70  if (connect)
71  *connect = true;
72  if (enabled)
73  {
74  *enabled_given = false;
75  *enabled = true;
76  }
77  if (create_slot)
78  *create_slot = true;
79  if (slot_name)
80  *slot_name = NULL;
81  if (copy_data)
82  *copy_data = true;
83 
84  /* Parse options */
85  foreach (lc, options)
86  {
87  DefElem *defel = (DefElem *) lfirst(lc);
88 
89  if (strcmp(defel->defname, "noconnect") == 0 && connect)
90  {
91  if (connect_given)
92  ereport(ERROR,
93  (errcode(ERRCODE_SYNTAX_ERROR),
94  errmsg("conflicting or redundant options")));
95 
96  connect_given = true;
97  *connect = !defGetBoolean(defel);
98  }
99  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
100  {
101  if (*enabled_given)
102  ereport(ERROR,
103  (errcode(ERRCODE_SYNTAX_ERROR),
104  errmsg("conflicting or redundant options")));
105 
106  *enabled_given = true;
107  *enabled = defGetBoolean(defel);
108  }
109  else if (strcmp(defel->defname, "disabled") == 0 && enabled)
110  {
111  if (*enabled_given)
112  ereport(ERROR,
113  (errcode(ERRCODE_SYNTAX_ERROR),
114  errmsg("conflicting or redundant options")));
115 
116  *enabled_given = true;
117  *enabled = !defGetBoolean(defel);
118  }
119  else if (strcmp(defel->defname, "create slot") == 0 && create_slot)
120  {
121  if (create_slot_given)
122  ereport(ERROR,
123  (errcode(ERRCODE_SYNTAX_ERROR),
124  errmsg("conflicting or redundant options")));
125 
126  create_slot_given = true;
127  *create_slot = defGetBoolean(defel);
128  }
129  else if (strcmp(defel->defname, "nocreate slot") == 0 && create_slot)
130  {
131  if (create_slot_given)
132  ereport(ERROR,
133  (errcode(ERRCODE_SYNTAX_ERROR),
134  errmsg("conflicting or redundant options")));
135 
136  create_slot_given = true;
137  *create_slot = !defGetBoolean(defel);
138  }
139  else if (strcmp(defel->defname, "slot name") == 0 && slot_name)
140  {
141  if (*slot_name)
142  ereport(ERROR,
143  (errcode(ERRCODE_SYNTAX_ERROR),
144  errmsg("conflicting or redundant options")));
145 
146  *slot_name = defGetString(defel);
147  }
148  else if (strcmp(defel->defname, "copy data") == 0 && copy_data)
149  {
150  if (copy_data_given)
151  ereport(ERROR,
152  (errcode(ERRCODE_SYNTAX_ERROR),
153  errmsg("conflicting or redundant options")));
154 
155  copy_data_given = true;
156  *copy_data = defGetBoolean(defel);
157  }
158  else if (strcmp(defel->defname, "nocopy data") == 0 && copy_data)
159  {
160  if (copy_data_given)
161  ereport(ERROR,
162  (errcode(ERRCODE_SYNTAX_ERROR),
163  errmsg("conflicting or redundant options")));
164 
165  copy_data_given = true;
166  *copy_data = !defGetBoolean(defel);
167  }
168  else
169  elog(ERROR, "unrecognized option: %s", defel->defname);
170  }
171 
172  /*
173  * We've been explicitly asked to not connect, that requires some
174  * additional processing.
175  */
176  if (connect && !*connect)
177  {
178  /* Check for incompatible options from the user. */
179  if (*enabled_given && *enabled)
180  ereport(ERROR,
181  (errcode(ERRCODE_SYNTAX_ERROR),
182  errmsg("noconnect and enabled are mutually exclusive options")));
183 
184  if (create_slot_given && *create_slot)
185  ereport(ERROR,
186  (errcode(ERRCODE_SYNTAX_ERROR),
187  errmsg("noconnect and create slot are mutually exclusive options")));
188 
189  if (copy_data_given && *copy_data)
190  ereport(ERROR,
191  (errcode(ERRCODE_SYNTAX_ERROR),
192  errmsg("noconnect and copy data are mutually exclusive options")));
193 
194  /* Change the defaults of other options. */
195  *enabled = false;
196  *create_slot = false;
197  *copy_data = false;
198  }
199 }
200 
201 /*
202  * Auxiliary function to return a text array out of a list of String nodes.
203  */
204 static Datum
206 {
207  ArrayType *arr;
208  Datum *datums;
209  int j = 0;
210  ListCell *cell;
211  MemoryContext memcxt;
212  MemoryContext oldcxt;
213 
214  /* Create memory context for temporary allocations. */
216  "publicationListToArray to array",
220  oldcxt = MemoryContextSwitchTo(memcxt);
221 
222  datums = palloc(sizeof(text *) * list_length(publist));
223  foreach(cell, publist)
224  {
225  char *name = strVal(lfirst(cell));
226  ListCell *pcell;
227 
228  /* Check for duplicates. */
229  foreach(pcell, publist)
230  {
231  char *pname = strVal(lfirst(pcell));
232 
233  if (name == pname)
234  break;
235 
236  if (strcmp(name, pname) == 0)
237  ereport(ERROR,
238  (errcode(ERRCODE_SYNTAX_ERROR),
239  errmsg("publication name \"%s\" used more than once",
240  pname)));
241  }
242 
243  datums[j++] = CStringGetTextDatum(name);
244  }
245 
246  MemoryContextSwitchTo(oldcxt);
247 
248  arr = construct_array(datums, list_length(publist),
249  TEXTOID, -1, false, 'i');
250  MemoryContextDelete(memcxt);
251 
252  return PointerGetDatum(arr);
253 }
254 
255 /*
256  * Create new subscription.
257  */
260 {
261  Relation rel;
262  ObjectAddress myself;
263  Oid subid;
264  bool nulls[Natts_pg_subscription];
266  Oid owner = GetUserId();
267  HeapTuple tup;
268  bool connect;
269  bool enabled_given;
270  bool enabled;
271  bool copy_data;
272  char *conninfo;
273  char *slotname;
274  char originname[NAMEDATALEN];
275  bool create_slot;
276  List *publications;
277 
278  /*
279  * Parse and check options.
280  * Connection and publication should not be specified here.
281  */
282  parse_subscription_options(stmt->options, &connect, &enabled_given,
283  &enabled, &create_slot, &slotname, &copy_data);
284 
285  /*
286  * Since creating a replication slot is not transactional, rolling back
287  * the transaction leaves the created replication slot. So we cannot run
288  * CREATE SUBSCRIPTION inside a transaction block if creating a
289  * replication slot.
290  */
291  if (create_slot)
292  PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... CREATE SLOT");
293 
294  if (!superuser())
295  ereport(ERROR,
296  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
297  (errmsg("must be superuser to create subscriptions"))));
298 
300 
301  /* Check if name is used */
303  CStringGetDatum(stmt->subname));
304  if (OidIsValid(subid))
305  {
306  ereport(ERROR,
308  errmsg("subscription \"%s\" already exists",
309  stmt->subname)));
310  }
311 
312  if (slotname == NULL)
313  slotname = stmt->subname;
314 
315  conninfo = stmt->conninfo;
316  publications = stmt->publication;
317 
318  /* Load the library providing us libpq calls. */
319  load_file("libpqwalreceiver", false);
320 
321  /* Check the connection info string. */
322  walrcv_check_conninfo(conninfo);
323 
324  /* Everything ok, form a new tuple. */
325  memset(values, 0, sizeof(values));
326  memset(nulls, false, sizeof(nulls));
327 
329  values[Anum_pg_subscription_subname - 1] =
331  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
332  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
334  CStringGetTextDatum(conninfo);
338  publicationListToArray(publications);
339 
340  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
341 
342  /* Insert tuple into catalog. */
343  subid = CatalogTupleInsert(rel, tup);
344  heap_freetuple(tup);
345 
347 
348  snprintf(originname, sizeof(originname), "pg_%u", subid);
349  replorigin_create(originname);
350 
351  /*
352  * Connect to remote side to execute requested commands and fetch table
353  * info.
354  */
355  if (connect)
356  {
357  XLogRecPtr lsn;
358  char *err;
360  List *tables;
361  ListCell *lc;
362  char table_state;
363 
364  /* Try to connect to the publisher. */
365  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
366  if (!wrconn)
367  ereport(ERROR,
368  (errmsg("could not connect to the publisher: %s", err)));
369 
370  PG_TRY();
371  {
372  /*
373  * If requested, create permanent slot for the subscription.
374  * We won't use the initial snapshot for anything, so no need
375  * to export it.
376  */
377  if (create_slot)
378  {
379  walrcv_create_slot(wrconn, slotname, false,
380  CRS_NOEXPORT_SNAPSHOT, &lsn);
381  ereport(NOTICE,
382  (errmsg("created replication slot \"%s\" on publisher",
383  slotname)));
384  }
385 
386  /*
387  * Set sync state based on if we were asked to do data copy or
388  * not.
389  */
390  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
391 
392  /*
393  * Get the table list from publisher and build local table status
394  * info.
395  */
396  tables = fetch_table_list(wrconn, publications);
397  foreach (lc, tables)
398  {
399  RangeVar *rv = (RangeVar *) lfirst(lc);
400  Oid relid;
401 
402  relid = RangeVarGetRelid(rv, AccessShareLock, false);
403 
404  SetSubscriptionRelState(subid, relid, table_state,
406  }
407 
408  ereport(NOTICE,
409  (errmsg("synchronized table states")));
410  }
411  PG_CATCH();
412  {
413  /* Close the connection in case of failure. */
414  walrcv_disconnect(wrconn);
415  PG_RE_THROW();
416  }
417  PG_END_TRY();
418 
419  /* And we are done with the remote side. */
420  walrcv_disconnect(wrconn);
421  }
422  else
424  (errmsg("tables were not subscribed, you will have to run "
425  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
426  "subscribe the tables")));
427 
429 
431 
433 
435 
436  return myself;
437 }
438 
439 static void
441 {
442  char *err;
443  List *pubrel_names;
444  List *subrel_states;
445  Oid *subrel_local_oids;
446  Oid *pubrel_local_oids;
447  ListCell *lc;
448  int off;
449 
450  /* Load the library providing us libpq calls. */
451  load_file("libpqwalreceiver", false);
452 
453  /* Try to connect to the publisher. */
454  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
455  if (!wrconn)
456  ereport(ERROR,
457  (errmsg("could not connect to the publisher: %s", err)));
458 
459  /* Get the table list from publisher. */
460  pubrel_names = fetch_table_list(wrconn, sub->publications);
461 
462  /* We are done with the remote side, close connection. */
464 
465  /* Get local table list. */
466  subrel_states = GetSubscriptionRelations(sub->oid);
467 
468  /*
469  * Build qsorted array of local table oids for faster lookup.
470  * This can potentially contain all tables in the database so
471  * speed of lookup is important.
472  */
473  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
474  off = 0;
475  foreach(lc, subrel_states)
476  {
478  subrel_local_oids[off++] = relstate->relid;
479  }
480  qsort(subrel_local_oids, list_length(subrel_states),
481  sizeof(Oid), oid_cmp);
482 
483  /*
484  * Walk over the remote tables and try to match them to locally
485  * known tables. If the table is not known locally create a new state
486  * for it.
487  *
488  * Also builds array of local oids of remote tables for the next step.
489  */
490  off = 0;
491  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
492 
493  foreach (lc, pubrel_names)
494  {
495  RangeVar *rv = (RangeVar *) lfirst(lc);
496  Oid relid;
497 
498  relid = RangeVarGetRelid(rv, AccessShareLock, false);
499  pubrel_local_oids[off++] = relid;
500 
501  if (!bsearch(&relid, subrel_local_oids,
502  list_length(subrel_states), sizeof(Oid), oid_cmp))
503  {
504  SetSubscriptionRelState(sub->oid, relid,
507  ereport(NOTICE,
508  (errmsg("added subscription for table %s.%s",
510  quote_identifier(rv->relname))));
511  }
512  }
513 
514  /*
515  * Next remove state for tables we should not care about anymore using
516  * the data we collected above
517  */
518  qsort(pubrel_local_oids, list_length(pubrel_names),
519  sizeof(Oid), oid_cmp);
520 
521  for (off = 0; off < list_length(subrel_states); off++)
522  {
523  Oid relid = subrel_local_oids[off];
524 
525  if (!bsearch(&relid, pubrel_local_oids,
526  list_length(pubrel_names), sizeof(Oid), oid_cmp))
527  {
528  char *namespace;
529 
530  RemoveSubscriptionRel(sub->oid, relid);
531 
532  namespace = get_namespace_name(get_rel_namespace(relid));
533  ereport(NOTICE,
534  (errmsg("removed subscription for table %s.%s",
535  quote_identifier(namespace),
536  quote_identifier(get_rel_name(relid)))));
537  }
538  }
539 }
540 
541 /*
542  * Alter the existing subscription.
543  */
546 {
547  Relation rel;
548  ObjectAddress myself;
549  bool nulls[Natts_pg_subscription];
550  bool replaces[Natts_pg_subscription];
552  HeapTuple tup;
553  Oid subid;
554  bool update_tuple = false;
555 
557 
558  /* Fetch the existing tuple. */
560  CStringGetDatum(stmt->subname));
561 
562  if (!HeapTupleIsValid(tup))
563  ereport(ERROR,
564  (errcode(ERRCODE_UNDEFINED_OBJECT),
565  errmsg("subscription \"%s\" does not exist",
566  stmt->subname)));
567 
568  /* must be owner */
571  stmt->subname);
572 
573  subid = HeapTupleGetOid(tup);
574 
575  /* Form a new tuple. */
576  memset(values, 0, sizeof(values));
577  memset(nulls, false, sizeof(nulls));
578  memset(replaces, false, sizeof(replaces));
579 
580  switch (stmt->kind)
581  {
583  {
584  char *slot_name;
585 
587  NULL, &slot_name, NULL);
588 
591  replaces[Anum_pg_subscription_subslotname - 1] = true;
592 
593  update_tuple = true;
594  break;
595  }
596 
598  {
599  bool enabled,
600  enabled_given;
601 
603  &enabled_given, &enabled, NULL,
604  NULL, NULL);
605  Assert(enabled_given);
606 
607  values[Anum_pg_subscription_subenabled - 1] =
608  BoolGetDatum(enabled);
609  replaces[Anum_pg_subscription_subenabled - 1] = true;
610 
611  update_tuple = true;
612  break;
613  }
614 
618  replaces[Anum_pg_subscription_subconninfo - 1] = true;
619  update_tuple = true;
620  break;
621 
624  {
625  bool copy_data;
626  Subscription *sub = GetSubscription(subid, false);
627 
629  NULL, NULL, &copy_data);
630 
633  replaces[Anum_pg_subscription_subpublications - 1] = true;
634 
635  update_tuple = true;
636 
637  /* Refresh if user asked us to. */
639  {
640  /* Make sure refresh sees the new list of publications. */
641  sub->publications = stmt->publication;
642 
643  AlterSubscription_refresh(sub, copy_data);
644  }
645 
646  break;
647  }
648 
650  {
651  bool copy_data;
652  Subscription *sub = GetSubscription(subid, false);
653 
655  NULL, NULL, &copy_data);
656 
657  AlterSubscription_refresh(sub, copy_data);
658 
659  break;
660  }
661 
662  default:
663  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
664  stmt->kind);
665  }
666 
667  /* Update the catalog if needed. */
668  if (update_tuple)
669  {
670  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
671  replaces);
672 
673  CatalogTupleUpdate(rel, &tup->t_self, tup);
674 
675  heap_freetuple(tup);
676  }
677 
679 
681 
683 
684  return myself;
685 }
686 
687 /*
688  * Drop a subscription
689  */
690 void
691 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
692 {
693  Relation rel;
694  ObjectAddress myself;
695  HeapTuple tup;
696  Oid subid;
697  Datum datum;
698  bool isnull;
699  char *subname;
700  char *conninfo;
701  char *slotname;
702  char originname[NAMEDATALEN];
703  char *err = NULL;
704  RepOriginId originid;
706  StringInfoData cmd;
707 
708  /*
709  * Since dropping a replication slot is not transactional, the replication
710  * slot stays dropped even if the transaction rolls back. So we cannot
711  * run DROP SUBSCRIPTION inside a transaction block if dropping the
712  * replication slot.
713  */
714  if (stmt->drop_slot)
715  PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION ... DROP SLOT");
716 
717  /*
718  * Lock pg_subscription with AccessExclusiveLock to ensure
719  * that the launcher doesn't restart new worker during dropping
720  * the subscription
721  */
723 
725  CStringGetDatum(stmt->subname));
726 
727  if (!HeapTupleIsValid(tup))
728  {
729  heap_close(rel, NoLock);
730 
731  if (!stmt->missing_ok)
732  ereport(ERROR,
733  (errcode(ERRCODE_UNDEFINED_OBJECT),
734  errmsg("subscription \"%s\" does not exist",
735  stmt->subname)));
736  else
737  ereport(NOTICE,
738  (errmsg("subscription \"%s\" does not exist, skipping",
739  stmt->subname)));
740 
741  return;
742  }
743 
744  subid = HeapTupleGetOid(tup);
745 
746  /* must be owner */
747  if (!pg_subscription_ownercheck(subid, GetUserId()))
749  stmt->subname);
750 
751  /* DROP hook for the subscription being removed */
753 
754  /*
755  * Lock the subscription so nobody else can do anything with it
756  * (including the replication workers).
757  */
759 
760  /* Get subname */
761  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
763  Assert(!isnull);
764  subname = pstrdup(NameStr(*DatumGetName(datum)));
765 
766  /* Get conninfo */
767  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
769  Assert(!isnull);
770  conninfo = pstrdup(TextDatumGetCString(datum));
771 
772  /* Get slotname */
773  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
775  Assert(!isnull);
776  slotname = pstrdup(NameStr(*DatumGetName(datum)));
777 
779  EventTriggerSQLDropAddObject(&myself, true, true);
780 
781  /* Remove the tuple from catalog. */
782  CatalogTupleDelete(rel, &tup->t_self);
783 
784  ReleaseSysCache(tup);
785 
786  /* Clean up dependencies */
788 
789  /* Remove any associated relation synchronization states. */
791 
792  /* Kill the apply worker so that the slot becomes accessible. */
794 
795  /* Remove the origin tracking if exists. */
796  snprintf(originname, sizeof(originname), "pg_%u", subid);
797  originid = replorigin_by_name(originname, true);
798  if (originid != InvalidRepOriginId)
799  replorigin_drop(originid);
800 
801  /* If the user asked to not drop the slot, we are done mow.*/
802  if (!stmt->drop_slot)
803  {
804  heap_close(rel, NoLock);
805  return;
806  }
807 
808  /*
809  * Otherwise drop the replication slot at the publisher node using
810  * the replication connection.
811  */
812  load_file("libpqwalreceiver", false);
813 
814  initStringInfo(&cmd);
815  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT \"%s\"", slotname);
816 
817  wrconn = walrcv_connect(conninfo, true, subname, &err);
818  if (wrconn == NULL)
819  ereport(ERROR,
820  (errmsg("could not connect to publisher when attempting to "
821  "drop the replication slot \"%s\"", slotname),
822  errdetail("The error was: %s", err)));
823 
824  PG_TRY();
825  {
826  WalRcvExecResult *res;
827  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
828 
829  if (res->status != WALRCV_OK_COMMAND)
830  ereport(ERROR,
831  (errmsg("could not drop the replication slot \"%s\" on publisher",
832  slotname),
833  errdetail("The error was: %s", res->err)));
834  else
835  ereport(NOTICE,
836  (errmsg("dropped replication slot \"%s\" on publisher",
837  slotname)));
838 
839  walrcv_clear_result(res);
840  }
841  PG_CATCH();
842  {
843  /* Close the connection in case of failure */
844  walrcv_disconnect(wrconn);
845  PG_RE_THROW();
846  }
847  PG_END_TRY();
848 
849  walrcv_disconnect(wrconn);
850 
851  pfree(cmd.data);
852 
853  heap_close(rel, NoLock);
854 }
855 
856 /*
857  * Internal workhorse for changing a subscription owner
858  */
859 static void
861 {
863 
864  form = (Form_pg_subscription) GETSTRUCT(tup);
865 
866  if (form->subowner == newOwnerId)
867  return;
868 
871  NameStr(form->subname));
872 
873  /* New owner must be a superuser */
874  if (!superuser_arg(newOwnerId))
875  ereport(ERROR,
876  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
877  errmsg("permission denied to change owner of subscription \"%s\"",
878  NameStr(form->subname)),
879  errhint("The owner of an subscription must be a superuser.")));
880 
881  form->subowner = newOwnerId;
882  CatalogTupleUpdate(rel, &tup->t_self, tup);
883 
884  /* Update owner dependency reference */
886  HeapTupleGetOid(tup),
887  newOwnerId);
888 
890  HeapTupleGetOid(tup), 0);
891 }
892 
893 /*
894  * Change subscription owner -- by name
895  */
897 AlterSubscriptionOwner(const char *name, Oid newOwnerId)
898 {
899  Oid subid;
900  HeapTuple tup;
901  Relation rel;
902  ObjectAddress address;
903 
905 
907  CStringGetDatum(name));
908 
909  if (!HeapTupleIsValid(tup))
910  ereport(ERROR,
911  (errcode(ERRCODE_UNDEFINED_OBJECT),
912  errmsg("subscription \"%s\" does not exist", name)));
913 
914  subid = HeapTupleGetOid(tup);
915 
916  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
917 
918  ObjectAddressSet(address, SubscriptionRelationId, subid);
919 
920  heap_freetuple(tup);
921 
923 
924  return address;
925 }
926 
927 /*
928  * Change subscription owner -- by OID
929  */
930 void
932 {
933  HeapTuple tup;
934  Relation rel;
935 
937 
939 
940  if (!HeapTupleIsValid(tup))
941  ereport(ERROR,
942  (errcode(ERRCODE_UNDEFINED_OBJECT),
943  errmsg("subscription with OID %u does not exist", subid)));
944 
945  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
946 
947  heap_freetuple(tup);
948 
950 }
951 
952 /*
953  * Get the list of tables which belong to specified publications on the
954  * publisher connection.
955  */
956 static List *
958 {
959  WalRcvExecResult *res;
960  StringInfoData cmd;
961  TupleTableSlot *slot;
962  Oid tableRow[2] = {TEXTOID, TEXTOID};
963  ListCell *lc;
964  bool first;
965  List *tablelist = NIL;
966 
967  Assert(list_length(publications) > 0);
968 
969  initStringInfo(&cmd);
970  appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
971  " FROM pg_catalog.pg_publication_tables t\n"
972  " WHERE t.pubname IN (");
973  first = true;
974  foreach (lc, publications)
975  {
976  char *pubname = strVal(lfirst(lc));
977 
978  if (first)
979  first = false;
980  else
981  appendStringInfoString(&cmd, ", ");
982 
983  appendStringInfo(&cmd, "%s", quote_literal_cstr(pubname));
984  }
985  appendStringInfoString(&cmd, ")");
986 
987  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
988  pfree(cmd.data);
989 
990  if (res->status != WALRCV_OK_TUPLES)
991  ereport(ERROR,
992  (errmsg("could not receive list of replicated tables from the publisher: %s",
993  res->err)));
994 
995  /* Process tables. */
996  slot = MakeSingleTupleTableSlot(res->tupledesc);
997  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
998  {
999  char *nspname;
1000  char *relname;
1001  bool isnull;
1002  RangeVar *rv;
1003 
1004  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1005  Assert(!isnull);
1006  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1007  Assert(!isnull);
1008 
1009  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1010  tablelist = lappend(tablelist, rv);
1011 
1012  ExecClearTuple(slot);
1013  }
1015 
1016  walrcv_clear_result(res);
1017 
1018  return tablelist;
1019 }
#define connect(s, name, namelen)
Definition: win32.h:383
#define NIL
Definition: pg_list.h:69
#define SUBREL_STATE_INIT
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
WalReceiverConn * wrconn
Definition: worker.c:107
ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define Anum_pg_subscription_subpublications
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10185
#define Anum_pg_subscription_subowner
#define RelationGetDescr(relation)
Definition: rel.h:429
Oid GetUserId(void)
Definition: miscinit.c:283
#define TEXTOID
Definition: pg_type.h:324
#define PointerGetDatum(X)
Definition: postgres.h:562
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:53
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:244
char * pstrdup(const char *in)
Definition: mcxt.c:1077
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:154
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3306
#define AccessShareLock
Definition: lockdefs.h:36
uint16 RepOriginId
Definition: xlogdefs.h:51
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:584
FormData_pg_subscription * Form_pg_subscription
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:159
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1374
unsigned int Oid
Definition: postgres_ext.h:31
ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:260
#define OidIsValid(objectId)
Definition: c.h:538
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:206
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
#define GetSysCacheOid2(cacheId, key1, key2)
Definition: syscache.h:181
char * schemaname
Definition: primnodes.h:67
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:591
char * relname
Definition: primnodes.h:68
Subscription * GetSubscription(Oid subid, bool missing_ok)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:304
bool defGetBoolean(DefElem *def)
Definition: define.c:111
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:268
void pfree(void *pointer)
Definition: mcxt.c:950
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:110
TupleDesc tupledesc
Definition: walreceiver.h:190
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define Natts_pg_subscription
#define Anum_pg_subscription_subname
char * defGetString(DefElem *def)
Definition: define.c:49
ItemPointerData t_self
Definition: htup.h:65
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:189
#define SubscriptionRelationId
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3006
#define NoLock
Definition: lockdefs.h:34
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:216
AlterSubscriptionType kind
Definition: parsenodes.h:3361
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:328
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:873
List * GetSubscriptionRelations(Oid subid)
#define CStringGetDatum(X)
Definition: postgres.h:584
List * publications
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
RepOriginId replorigin_create(char *roname)
Definition: origin.c:235
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc)
Definition: execTuples.c:199
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
bool superuser_arg(Oid roleid)
Definition: superuser.c:57
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:823
#define Anum_pg_subscription_subslotname
List * lappend(List *list, void *datum)
Definition: list.c:128
void initStringInfo(StringInfo str)
Definition: stringinfo.c:65
#define Anum_pg_subscription_subdbid
#define WARNING
Definition: elog.h:40
void replorigin_drop(RepOriginId roident)
Definition: origin.c:327
#define TextDatumGetCString(d)
Definition: builtins.h:92
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1116
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1278
Oid MyDatabaseId
Definition: globals.c:76
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1287
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, char **slot_name, bool *copy_data)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define BoolGetDatum(X)
Definition: postgres.h:408
#define InvalidOid
Definition: postgres_ext.h:36
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1061
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
Tuplestorestate * tuplestore
Definition: walreceiver.h:189
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
#define lfirst(lc)
Definition: pg_list.h:106
WalRcvExecStatus status
Definition: walreceiver.h:187
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:133
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
static int list_length(const List *l)
Definition: pg_list.h:89
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
const char * name
Definition: encode.c:521
#define InvalidRepOriginId
Definition: origin.h:34
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:162
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:161
#define AccessExclusiveLock
Definition: lockdefs.h:46
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1143
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
Definition: c.h:439
char * defname
Definition: parsenodes.h:708
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define qsort(a, b, c, d)
Definition: port.h:440
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:615
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:163
#define PG_TRY()
Definition: elog.h:284
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:793
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
Definition: pg_list.h:45
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1694
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
#define SUBREL_STATE_READY
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:419
#define PG_END_TRY()
Definition: elog.h:300
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:262
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3154
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
#define SearchSysCache2(cacheId, key1, key2)
Definition: syscache.h:154