PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
subscriptioncmds.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * subscriptioncmds.c
4  * subscription catalog manipulation functions
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * subscriptioncmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
18 
19 #include "access/heapam.h"
20 #include "access/htup_details.h"
21 #include "access/xact.h"
22 
23 #include "catalog/dependency.h"
24 #include "catalog/indexing.h"
25 #include "catalog/namespace.h"
26 #include "catalog/objectaccess.h"
27 #include "catalog/objectaddress.h"
28 #include "catalog/pg_type.h"
31 
32 #include "commands/defrem.h"
33 #include "commands/event_trigger.h"
35 
36 #include "executor/executor.h"
37 
38 #include "nodes/makefuncs.h"
39 
41 #include "replication/origin.h"
43 #include "replication/walsender.h"
45 
46 #include "storage/lmgr.h"
47 
48 #include "utils/builtins.h"
49 #include "utils/guc.h"
50 #include "utils/lsyscache.h"
51 #include "utils/memutils.h"
52 #include "utils/syscache.h"
53 
54 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
55 
56 /*
57  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
58  *
59  * Since not all options can be specified in both commands, this function
60  * will report an error on options if the target output pointer is NULL to
61  * accommodate that.
62  */
63 static void
64 parse_subscription_options(List *options, bool *connect, bool *enabled_given,
65  bool *enabled, bool *create_slot,
66  bool *slot_name_given, char **slot_name,
67  bool *copy_data, char **synchronous_commit,
68  bool *refresh)
69 {
70  ListCell *lc;
71  bool connect_given = false;
72  bool create_slot_given = false;
73  bool copy_data_given = false;
74  bool refresh_given = false;
75 
76  /* If connect is specified, the others also need to be. */
77  Assert(!connect || (enabled && create_slot && copy_data));
78 
79  if (connect)
80  *connect = true;
81  if (enabled)
82  {
83  *enabled_given = false;
84  *enabled = true;
85  }
86  if (create_slot)
87  *create_slot = true;
88  if (slot_name)
89  {
90  *slot_name_given = false;
91  *slot_name = NULL;
92  }
93  if (copy_data)
94  *copy_data = true;
95  if (synchronous_commit)
96  *synchronous_commit = NULL;
97  if (refresh)
98  *refresh = true;
99 
100  /* Parse options */
101  foreach(lc, options)
102  {
103  DefElem *defel = (DefElem *) lfirst(lc);
104 
105  if (strcmp(defel->defname, "connect") == 0 && connect)
106  {
107  if (connect_given)
108  ereport(ERROR,
109  (errcode(ERRCODE_SYNTAX_ERROR),
110  errmsg("conflicting or redundant options")));
111 
112  connect_given = true;
113  *connect = defGetBoolean(defel);
114  }
115  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
116  {
117  if (*enabled_given)
118  ereport(ERROR,
119  (errcode(ERRCODE_SYNTAX_ERROR),
120  errmsg("conflicting or redundant options")));
121 
122  *enabled_given = true;
123  *enabled = defGetBoolean(defel);
124  }
125  else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
126  {
127  if (create_slot_given)
128  ereport(ERROR,
129  (errcode(ERRCODE_SYNTAX_ERROR),
130  errmsg("conflicting or redundant options")));
131 
132  create_slot_given = true;
133  *create_slot = defGetBoolean(defel);
134  }
135  else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
136  {
137  if (*slot_name_given)
138  ereport(ERROR,
139  (errcode(ERRCODE_SYNTAX_ERROR),
140  errmsg("conflicting or redundant options")));
141 
142  *slot_name_given = true;
143  *slot_name = defGetString(defel);
144 
145  /* Setting slot_name = NONE is treated as no slot name. */
146  if (strcmp(*slot_name, "none") == 0)
147  *slot_name = NULL;
148  }
149  else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
150  {
151  if (copy_data_given)
152  ereport(ERROR,
153  (errcode(ERRCODE_SYNTAX_ERROR),
154  errmsg("conflicting or redundant options")));
155 
156  copy_data_given = true;
157  *copy_data = defGetBoolean(defel);
158  }
159  else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
160  synchronous_commit)
161  {
162  if (*synchronous_commit)
163  ereport(ERROR,
164  (errcode(ERRCODE_SYNTAX_ERROR),
165  errmsg("conflicting or redundant options")));
166 
167  *synchronous_commit = defGetString(defel);
168 
169  /* Test if the given value is valid for synchronous_commit GUC. */
170  (void) set_config_option("synchronous_commit", *synchronous_commit,
172  false, 0, false);
173  }
174  else if (strcmp(defel->defname, "refresh") == 0 && refresh)
175  {
176  if (refresh_given)
177  ereport(ERROR,
178  (errcode(ERRCODE_SYNTAX_ERROR),
179  errmsg("conflicting or redundant options")));
180 
181  refresh_given = true;
182  *refresh = defGetBoolean(defel);
183  }
184  else
185  ereport(ERROR,
186  (errcode(ERRCODE_SYNTAX_ERROR),
187  errmsg("unrecognized subscription parameter: %s", defel->defname)));
188  }
189 
190  /*
191  * We've been explicitly asked to not connect, that requires some
192  * additional processing.
193  */
194  if (connect && !*connect)
195  {
196  /* Check for incompatible options from the user. */
197  if (enabled && *enabled_given && *enabled)
198  ereport(ERROR,
199  (errcode(ERRCODE_SYNTAX_ERROR),
200  errmsg("connect = false and enabled = true are mutually exclusive options")));
201 
202  if (create_slot && create_slot_given && *create_slot)
203  ereport(ERROR,
204  (errcode(ERRCODE_SYNTAX_ERROR),
205  errmsg("connect = false and create_slot = true are mutually exclusive options")));
206 
207  if (copy_data && copy_data_given && *copy_data)
208  ereport(ERROR,
209  (errcode(ERRCODE_SYNTAX_ERROR),
210  errmsg("connect = false and copy_data = true are mutually exclusive options")));
211 
212  /* Change the defaults of other options. */
213  *enabled = false;
214  *create_slot = false;
215  *copy_data = false;
216  }
217 
218  /*
219  * Do additional checking for disallowed combination when slot_name = NONE
220  * was used.
221  */
222  if (slot_name && *slot_name_given && !*slot_name)
223  {
224  if (enabled && *enabled_given && *enabled)
225  ereport(ERROR,
226  (errcode(ERRCODE_SYNTAX_ERROR),
227  errmsg("slot_name = NONE and enabled = true are mutually exclusive options")));
228 
229  if (create_slot && create_slot_given && *create_slot)
230  ereport(ERROR,
231  (errcode(ERRCODE_SYNTAX_ERROR),
232  errmsg("slot_name = NONE and create_slot = true are mutually exclusive options")));
233 
234  if (enabled && !*enabled_given && *enabled)
235  ereport(ERROR,
236  (errcode(ERRCODE_SYNTAX_ERROR),
237  errmsg("subscription with slot_name = NONE must also set enabled = false")));
238 
239  if (create_slot && !create_slot_given && *create_slot)
240  ereport(ERROR,
241  (errcode(ERRCODE_SYNTAX_ERROR),
242  errmsg("subscription with slot_name = NONE must also set create_slot = false")));
243  }
244 }
245 
246 /*
247  * Auxiliary function to return a text array out of a list of String nodes.
248  */
249 static Datum
251 {
252  ArrayType *arr;
253  Datum *datums;
254  int j = 0;
255  ListCell *cell;
256  MemoryContext memcxt;
257  MemoryContext oldcxt;
258 
259  /* Create memory context for temporary allocations. */
261  "publicationListToArray to array",
265  oldcxt = MemoryContextSwitchTo(memcxt);
266 
267  datums = palloc(sizeof(text *) * list_length(publist));
268  foreach(cell, publist)
269  {
270  char *name = strVal(lfirst(cell));
271  ListCell *pcell;
272 
273  /* Check for duplicates. */
274  foreach(pcell, publist)
275  {
276  char *pname = strVal(lfirst(pcell));
277 
278  if (name == pname)
279  break;
280 
281  if (strcmp(name, pname) == 0)
282  ereport(ERROR,
283  (errcode(ERRCODE_SYNTAX_ERROR),
284  errmsg("publication name \"%s\" used more than once",
285  pname)));
286  }
287 
288  datums[j++] = CStringGetTextDatum(name);
289  }
290 
291  MemoryContextSwitchTo(oldcxt);
292 
293  arr = construct_array(datums, list_length(publist),
294  TEXTOID, -1, false, 'i');
295  MemoryContextDelete(memcxt);
296 
297  return PointerGetDatum(arr);
298 }
299 
300 /*
301  * Create new subscription.
302  */
305 {
306  Relation rel;
307  ObjectAddress myself;
308  Oid subid;
309  bool nulls[Natts_pg_subscription];
311  Oid owner = GetUserId();
312  HeapTuple tup;
313  bool connect;
314  bool enabled_given;
315  bool enabled;
316  bool copy_data;
317  char *synchronous_commit;
318  char *conninfo;
319  char *slotname;
320  bool slotname_given;
321  char originname[NAMEDATALEN];
322  bool create_slot;
323  List *publications;
324 
325  /*
326  * Parse and check options.
327  *
328  * Connection and publication should not be specified here.
329  */
330  parse_subscription_options(stmt->options, &connect, &enabled_given,
331  &enabled, &create_slot, &slotname_given,
332  &slotname, &copy_data, &synchronous_commit,
333  NULL);
334 
335  /*
336  * Since creating a replication slot is not transactional, rolling back
337  * the transaction leaves the created replication slot. So we cannot run
338  * CREATE SUBSCRIPTION inside a transaction block if creating a
339  * replication slot.
340  */
341  if (create_slot)
342  PreventTransactionChain(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
343 
344  if (!superuser())
345  ereport(ERROR,
346  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
347  (errmsg("must be superuser to create subscriptions"))));
348 
350 
351  /* Check if name is used */
353  CStringGetDatum(stmt->subname));
354  if (OidIsValid(subid))
355  {
356  ereport(ERROR,
358  errmsg("subscription \"%s\" already exists",
359  stmt->subname)));
360  }
361 
362  if (!slotname_given && slotname == NULL)
363  slotname = stmt->subname;
364 
365  /* The default for synchronous_commit of subscriptions is off. */
366  if (synchronous_commit == NULL)
367  synchronous_commit = "off";
368 
369  conninfo = stmt->conninfo;
370  publications = stmt->publication;
371 
372  /* Load the library providing us libpq calls. */
373  load_file("libpqwalreceiver", false);
374 
375  /* Check the connection info string. */
376  walrcv_check_conninfo(conninfo);
377 
378  /* Everything ok, form a new tuple. */
379  memset(values, 0, sizeof(values));
380  memset(nulls, false, sizeof(nulls));
381 
383  values[Anum_pg_subscription_subname - 1] =
385  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
386  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
388  CStringGetTextDatum(conninfo);
389  if (slotname)
392  else
393  nulls[Anum_pg_subscription_subslotname - 1] = true;
395  CStringGetTextDatum(synchronous_commit);
397  publicationListToArray(publications);
398 
399  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
400 
401  /* Insert tuple into catalog. */
402  subid = CatalogTupleInsert(rel, tup);
403  heap_freetuple(tup);
404 
406 
407  snprintf(originname, sizeof(originname), "pg_%u", subid);
408  replorigin_create(originname);
409 
410  /*
411  * Connect to remote side to execute requested commands and fetch table
412  * info.
413  */
414  if (connect)
415  {
416  XLogRecPtr lsn;
417  char *err;
419  List *tables;
420  ListCell *lc;
421  char table_state;
422 
423  /* Try to connect to the publisher. */
424  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
425  if (!wrconn)
426  ereport(ERROR,
427  (errmsg("could not connect to the publisher: %s", err)));
428 
429  PG_TRY();
430  {
431  /*
432  * Set sync state based on if we were asked to do data copy or
433  * not.
434  */
435  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
436 
437  /*
438  * Get the table list from publisher and build local table status
439  * info.
440  */
441  tables = fetch_table_list(wrconn, publications);
442  foreach(lc, tables)
443  {
444  RangeVar *rv = (RangeVar *) lfirst(lc);
445  Oid relid;
446 
447  relid = RangeVarGetRelid(rv, AccessShareLock, false);
448 
449  /* Check for supported relkind. */
451  rv->schemaname, rv->relname);
452 
453  SetSubscriptionRelState(subid, relid, table_state,
454  InvalidXLogRecPtr, false);
455  }
456 
457  /*
458  * If requested, create permanent slot for the subscription. We
459  * won't use the initial snapshot for anything, so no need to
460  * export it.
461  */
462  if (create_slot)
463  {
464  Assert(slotname);
465 
466  walrcv_create_slot(wrconn, slotname, false,
467  CRS_NOEXPORT_SNAPSHOT, &lsn);
468  ereport(NOTICE,
469  (errmsg("created replication slot \"%s\" on publisher",
470  slotname)));
471  }
472  }
473  PG_CATCH();
474  {
475  /* Close the connection in case of failure. */
476  walrcv_disconnect(wrconn);
477  PG_RE_THROW();
478  }
479  PG_END_TRY();
480 
481  /* And we are done with the remote side. */
482  walrcv_disconnect(wrconn);
483  }
484  else
486  (errmsg("tables were not subscribed, you will have to run "
487  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to "
488  "subscribe the tables")));
489 
491 
492  if (enabled)
494 
496 
498 
499  return myself;
500 }
501 
502 static void
504 {
505  char *err;
506  List *pubrel_names;
507  List *subrel_states;
508  Oid *subrel_local_oids;
509  Oid *pubrel_local_oids;
510  ListCell *lc;
511  int off;
512 
513  /* Load the library providing us libpq calls. */
514  load_file("libpqwalreceiver", false);
515 
516  /* Try to connect to the publisher. */
517  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
518  if (!wrconn)
519  ereport(ERROR,
520  (errmsg("could not connect to the publisher: %s", err)));
521 
522  /* Get the table list from publisher. */
523  pubrel_names = fetch_table_list(wrconn, sub->publications);
524 
525  /* We are done with the remote side, close connection. */
527 
528  /* Get local table list. */
529  subrel_states = GetSubscriptionRelations(sub->oid);
530 
531  /*
532  * Build qsorted array of local table oids for faster lookup. This can
533  * potentially contain all tables in the database so speed of lookup is
534  * important.
535  */
536  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
537  off = 0;
538  foreach(lc, subrel_states)
539  {
541 
542  subrel_local_oids[off++] = relstate->relid;
543  }
544  qsort(subrel_local_oids, list_length(subrel_states),
545  sizeof(Oid), oid_cmp);
546 
547  /*
548  * Walk over the remote tables and try to match them to locally known
549  * tables. If the table is not known locally create a new state for it.
550  *
551  * Also builds array of local oids of remote tables for the next step.
552  */
553  off = 0;
554  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
555 
556  foreach(lc, pubrel_names)
557  {
558  RangeVar *rv = (RangeVar *) lfirst(lc);
559  Oid relid;
560 
561  relid = RangeVarGetRelid(rv, AccessShareLock, false);
562 
563  /* Check for supported relkind. */
565  rv->schemaname, rv->relname);
566 
567  pubrel_local_oids[off++] = relid;
568 
569  if (!bsearch(&relid, subrel_local_oids,
570  list_length(subrel_states), sizeof(Oid), oid_cmp))
571  {
572  SetSubscriptionRelState(sub->oid, relid,
574  InvalidXLogRecPtr, false);
575  ereport(DEBUG1,
576  (errmsg("table \"%s.%s\" added to subscription \"%s\"",
577  rv->schemaname, rv->relname, sub->name)));
578  }
579  }
580 
581  /*
582  * Next remove state for tables we should not care about anymore using the
583  * data we collected above
584  */
585  qsort(pubrel_local_oids, list_length(pubrel_names),
586  sizeof(Oid), oid_cmp);
587 
588  for (off = 0; off < list_length(subrel_states); off++)
589  {
590  Oid relid = subrel_local_oids[off];
591 
592  if (!bsearch(&relid, pubrel_local_oids,
593  list_length(pubrel_names), sizeof(Oid), oid_cmp))
594  {
595  RemoveSubscriptionRel(sub->oid, relid);
596 
598 
599  ereport(DEBUG1,
600  (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
602  get_rel_name(relid),
603  sub->name)));
604  }
605  }
606 }
607 
608 /*
609  * Alter the existing subscription.
610  */
613 {
614  Relation rel;
615  ObjectAddress myself;
616  bool nulls[Natts_pg_subscription];
617  bool replaces[Natts_pg_subscription];
619  HeapTuple tup;
620  Oid subid;
621  bool update_tuple = false;
622  Subscription *sub;
623 
625 
626  /* Fetch the existing tuple. */
628  CStringGetDatum(stmt->subname));
629 
630  if (!HeapTupleIsValid(tup))
631  ereport(ERROR,
632  (errcode(ERRCODE_UNDEFINED_OBJECT),
633  errmsg("subscription \"%s\" does not exist",
634  stmt->subname)));
635 
636  /* must be owner */
639  stmt->subname);
640 
641  subid = HeapTupleGetOid(tup);
642  sub = GetSubscription(subid, false);
643 
644  /* Lock the subscription so nobody else can do anything with it. */
646 
647  /* Form a new tuple. */
648  memset(values, 0, sizeof(values));
649  memset(nulls, false, sizeof(nulls));
650  memset(replaces, false, sizeof(replaces));
651 
652  switch (stmt->kind)
653  {
655  {
656  char *slotname;
657  bool slotname_given;
658  char *synchronous_commit;
659 
661  NULL, &slotname_given, &slotname,
662  NULL, &synchronous_commit, NULL);
663 
664  if (slotname_given)
665  {
666  if (sub->enabled && !slotname)
667  ereport(ERROR,
668  (errcode(ERRCODE_SYNTAX_ERROR),
669  errmsg("cannot set slot_name = NONE for enabled subscription")));
670 
671  if (slotname)
674  else
675  nulls[Anum_pg_subscription_subslotname - 1] = true;
676  replaces[Anum_pg_subscription_subslotname - 1] = true;
677  }
678 
679  if (synchronous_commit)
680  {
682  CStringGetTextDatum(synchronous_commit);
683  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
684  }
685 
686  update_tuple = true;
687  break;
688  }
689 
691  {
692  bool enabled,
693  enabled_given;
694 
696  &enabled_given, &enabled, NULL,
697  NULL, NULL, NULL, NULL, NULL);
698  Assert(enabled_given);
699 
700  if (!sub->slotname && enabled)
701  ereport(ERROR,
702  (errcode(ERRCODE_SYNTAX_ERROR),
703  errmsg("cannot enable subscription that does not have a slot name")));
704 
705  values[Anum_pg_subscription_subenabled - 1] =
706  BoolGetDatum(enabled);
707  replaces[Anum_pg_subscription_subenabled - 1] = true;
708 
709  if (enabled)
711 
712  update_tuple = true;
713  break;
714  }
715 
717  /* Load the library providing us libpq calls. */
718  load_file("libpqwalreceiver", false);
719  /* Check the connection info string. */
721 
724  replaces[Anum_pg_subscription_subconninfo - 1] = true;
725  update_tuple = true;
726  break;
727 
729  {
730  bool copy_data;
731  bool refresh;
732 
734  NULL, NULL, NULL, &copy_data,
735  NULL, &refresh);
736 
739  replaces[Anum_pg_subscription_subpublications - 1] = true;
740 
741  update_tuple = true;
742 
743  /* Refresh if user asked us to. */
744  if (refresh)
745  {
746  if (!sub->enabled)
747  ereport(ERROR,
748  (errcode(ERRCODE_SYNTAX_ERROR),
749  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
750  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
751 
752  /* Make sure refresh sees the new list of publications. */
753  sub->publications = stmt->publication;
754 
755  AlterSubscription_refresh(sub, copy_data);
756  }
757 
758  break;
759  }
760 
762  {
763  bool copy_data;
764 
765  if (!sub->enabled)
766  ereport(ERROR,
767  (errcode(ERRCODE_SYNTAX_ERROR),
768  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
769 
771  NULL, NULL, NULL, &copy_data,
772  NULL, NULL);
773 
774  AlterSubscription_refresh(sub, copy_data);
775 
776  break;
777  }
778 
779  default:
780  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
781  stmt->kind);
782  }
783 
784  /* Update the catalog if needed. */
785  if (update_tuple)
786  {
787  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
788  replaces);
789 
790  CatalogTupleUpdate(rel, &tup->t_self, tup);
791 
792  heap_freetuple(tup);
793  }
794 
796 
798 
800 
801  return myself;
802 }
803 
804 /*
805  * Drop a subscription
806  */
807 void
808 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
809 {
810  Relation rel;
811  ObjectAddress myself;
812  HeapTuple tup;
813  Oid subid;
814  Datum datum;
815  bool isnull;
816  char *subname;
817  char *conninfo;
818  char *slotname;
819  List *subworkers;
820  ListCell *lc;
821  char originname[NAMEDATALEN];
822  char *err = NULL;
823  RepOriginId originid;
825  StringInfoData cmd;
826 
827  /*
828  * Lock pg_subscription with AccessExclusiveLock to ensure that the
829  * launcher doesn't restart new worker during dropping the subscription
830  */
832 
834  CStringGetDatum(stmt->subname));
835 
836  if (!HeapTupleIsValid(tup))
837  {
838  heap_close(rel, NoLock);
839 
840  if (!stmt->missing_ok)
841  ereport(ERROR,
842  (errcode(ERRCODE_UNDEFINED_OBJECT),
843  errmsg("subscription \"%s\" does not exist",
844  stmt->subname)));
845  else
846  ereport(NOTICE,
847  (errmsg("subscription \"%s\" does not exist, skipping",
848  stmt->subname)));
849 
850  return;
851  }
852 
853  subid = HeapTupleGetOid(tup);
854 
855  /* must be owner */
856  if (!pg_subscription_ownercheck(subid, GetUserId()))
858  stmt->subname);
859 
860  /* DROP hook for the subscription being removed */
862 
863  /*
864  * Lock the subscription so nobody else can do anything with it (including
865  * the replication workers).
866  */
868 
869  /* Get subname */
870  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
872  Assert(!isnull);
873  subname = pstrdup(NameStr(*DatumGetName(datum)));
874 
875  /* Get conninfo */
876  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
878  Assert(!isnull);
879  conninfo = TextDatumGetCString(datum);
880 
881  /* Get slotname */
882  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
884  if (!isnull)
885  slotname = pstrdup(NameStr(*DatumGetName(datum)));
886  else
887  slotname = NULL;
888 
889  /*
890  * Since dropping a replication slot is not transactional, the replication
891  * slot stays dropped even if the transaction rolls back. So we cannot
892  * run DROP SUBSCRIPTION inside a transaction block if dropping the
893  * replication slot.
894  *
895  * XXX The command name should really be something like "DROP SUBSCRIPTION
896  * of a subscription that is associated with a replication slot", but we
897  * don't have the proper facilities for that.
898  */
899  if (slotname)
900  PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION");
901 
902 
904  EventTriggerSQLDropAddObject(&myself, true, true);
905 
906  /* Remove the tuple from catalog. */
907  CatalogTupleDelete(rel, &tup->t_self);
908 
909  ReleaseSysCache(tup);
910 
911  /*
912  * If we are dropping the replication slot, stop all the subscription
913  * workers immediately, so that the slot becomes accessible. Otherwise
914  * just schedule the stopping for the end of the transaction.
915  *
916  * New workers won't be started because we hold an exclusive lock on the
917  * subscription till the end of the transaction.
918  */
919  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
920  subworkers = logicalrep_workers_find(subid, false);
921  LWLockRelease(LogicalRepWorkerLock);
922  foreach(lc, subworkers)
923  {
925 
926  if (slotname)
928  else
930  }
931  list_free(subworkers);
932 
933  /* Clean up dependencies */
935 
936  /* Remove any associated relation synchronization states. */
938 
939  /* Remove the origin tracking if exists. */
940  snprintf(originname, sizeof(originname), "pg_%u", subid);
941  originid = replorigin_by_name(originname, true);
942  if (originid != InvalidRepOriginId)
943  replorigin_drop(originid, false);
944 
945  /*
946  * If there is no slot associated with the subscription, we can finish
947  * here.
948  */
949  if (!slotname)
950  {
951  heap_close(rel, NoLock);
952  return;
953  }
954 
955  /*
956  * Otherwise drop the replication slot at the publisher node using the
957  * replication connection.
958  */
959  load_file("libpqwalreceiver", false);
960 
961  initStringInfo(&cmd);
962  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
963 
964  wrconn = walrcv_connect(conninfo, true, subname, &err);
965  if (wrconn == NULL)
966  ereport(ERROR,
967  (errmsg("could not connect to publisher when attempting to "
968  "drop the replication slot \"%s\"", slotname),
969  errdetail("The error was: %s", err),
970  errhint("Use ALTER SUBSCRIPTION ... SET (slot_name = NONE) "
971  "to disassociate the subscription from the slot.")));
972 
973  PG_TRY();
974  {
975  WalRcvExecResult *res;
976 
977  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
978 
979  if (res->status != WALRCV_OK_COMMAND)
980  ereport(ERROR,
981  (errmsg("could not drop the replication slot \"%s\" on publisher",
982  slotname),
983  errdetail("The error was: %s", res->err)));
984  else
985  ereport(NOTICE,
986  (errmsg("dropped replication slot \"%s\" on publisher",
987  slotname)));
988 
989  walrcv_clear_result(res);
990  }
991  PG_CATCH();
992  {
993  /* Close the connection in case of failure */
994  walrcv_disconnect(wrconn);
995  PG_RE_THROW();
996  }
997  PG_END_TRY();
998 
999  walrcv_disconnect(wrconn);
1000 
1001  pfree(cmd.data);
1002 
1003  heap_close(rel, NoLock);
1004 }
1005 
1006 /*
1007  * Internal workhorse for changing a subscription owner
1008  */
1009 static void
1011 {
1012  Form_pg_subscription form;
1013 
1014  form = (Form_pg_subscription) GETSTRUCT(tup);
1015 
1016  if (form->subowner == newOwnerId)
1017  return;
1018 
1021  NameStr(form->subname));
1022 
1023  /* New owner must be a superuser */
1024  if (!superuser_arg(newOwnerId))
1025  ereport(ERROR,
1026  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1027  errmsg("permission denied to change owner of subscription \"%s\"",
1028  NameStr(form->subname)),
1029  errhint("The owner of a subscription must be a superuser.")));
1030 
1031  form->subowner = newOwnerId;
1032  CatalogTupleUpdate(rel, &tup->t_self, tup);
1033 
1034  /* Update owner dependency reference */
1036  HeapTupleGetOid(tup),
1037  newOwnerId);
1038 
1040  HeapTupleGetOid(tup), 0);
1041 }
1042 
1043 /*
1044  * Change subscription owner -- by name
1045  */
1047 AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1048 {
1049  Oid subid;
1050  HeapTuple tup;
1051  Relation rel;
1052  ObjectAddress address;
1053 
1055 
1057  CStringGetDatum(name));
1058 
1059  if (!HeapTupleIsValid(tup))
1060  ereport(ERROR,
1061  (errcode(ERRCODE_UNDEFINED_OBJECT),
1062  errmsg("subscription \"%s\" does not exist", name)));
1063 
1064  subid = HeapTupleGetOid(tup);
1065 
1066  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1067 
1068  ObjectAddressSet(address, SubscriptionRelationId, subid);
1069 
1070  heap_freetuple(tup);
1071 
1073 
1074  return address;
1075 }
1076 
1077 /*
1078  * Change subscription owner -- by OID
1079  */
1080 void
1082 {
1083  HeapTuple tup;
1084  Relation rel;
1085 
1087 
1089 
1090  if (!HeapTupleIsValid(tup))
1091  ereport(ERROR,
1092  (errcode(ERRCODE_UNDEFINED_OBJECT),
1093  errmsg("subscription with OID %u does not exist", subid)));
1094 
1095  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1096 
1097  heap_freetuple(tup);
1098 
1100 }
1101 
1102 /*
1103  * Get the list of tables which belong to specified publications on the
1104  * publisher connection.
1105  */
1106 static List *
1108 {
1109  WalRcvExecResult *res;
1110  StringInfoData cmd;
1111  TupleTableSlot *slot;
1112  Oid tableRow[2] = {TEXTOID, TEXTOID};
1113  ListCell *lc;
1114  bool first;
1115  List *tablelist = NIL;
1116 
1117  Assert(list_length(publications) > 0);
1118 
1119  initStringInfo(&cmd);
1120  appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1121  " FROM pg_catalog.pg_publication_tables t\n"
1122  " WHERE t.pubname IN (");
1123  first = true;
1124  foreach(lc, publications)
1125  {
1126  char *pubname = strVal(lfirst(lc));
1127 
1128  if (first)
1129  first = false;
1130  else
1131  appendStringInfoString(&cmd, ", ");
1132 
1134  }
1135  appendStringInfoChar(&cmd, ')');
1136 
1137  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1138  pfree(cmd.data);
1139 
1140  if (res->status != WALRCV_OK_TUPLES)
1141  ereport(ERROR,
1142  (errmsg("could not receive list of replicated tables from the publisher: %s",
1143  res->err)));
1144 
1145  /* Process tables. */
1146  slot = MakeSingleTupleTableSlot(res->tupledesc);
1147  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1148  {
1149  char *nspname;
1150  char *relname;
1151  bool isnull;
1152  RangeVar *rv;
1153 
1154  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1155  Assert(!isnull);
1156  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1157  Assert(!isnull);
1158 
1159  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1160  tablelist = lappend(tablelist, rv);
1161 
1162  ExecClearTuple(slot);
1163  }
1165 
1166  walrcv_clear_result(res);
1167 
1168  return tablelist;
1169 }
#define connect(s, name, namelen)
Definition: win32.h:373
#define NIL
Definition: pg_list.h:69
#define SUBREL_STATE_INIT
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
WalReceiverConn * wrconn
Definition: worker.c:110
ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:265
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:46
#define DEBUG1
Definition: elog.h:25
int errhint(const char *fmt,...)
Definition: elog.c:987
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define Anum_pg_subscription_subpublications
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10381
#define Anum_pg_subscription_subowner
#define RelationGetDescr(relation)
Definition: rel.h:428
Oid GetUserId(void)
Definition: miscinit.c:284
#define TEXTOID
Definition: pg_type.h:324
#define PointerGetDatum(X)
Definition: postgres.h:562
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:53
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:244
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1801
char * pstrdup(const char *in)
Definition: mcxt.c:1077
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1750
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:332
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:154
TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: execTuples.c:439
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3306
#define AccessShareLock
Definition: lockdefs.h:36
uint16 RepOriginId
Definition: xlogdefs.h:51
#define strVal(v)
Definition: value.h:54
int errcode(int sqlerrcode)
Definition: elog.c:575
bool superuser(void)
Definition: superuser.c:47
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:255
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
#define heap_close(r, l)
Definition: heapam.h:97
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:584
FormData_pg_subscription * Form_pg_subscription
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:159
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1372
unsigned int Oid
Definition: postgres_ext.h:31
ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:260
#define OidIsValid(objectId)
Definition: c.h:538
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:211
#define ALLOCSET_DEFAULT_MINSIZE
Definition: memutils.h:162
#define GetSysCacheOid2(cacheId, key1, key2)
Definition: syscache.h:185
char * schemaname
Definition: primnodes.h:67
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1715
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:591
char * relname
Definition: primnodes.h:68
Subscription * GetSubscription(Oid subid, bool missing_ok)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:304
bool defGetBoolean(DefElem *def)
Definition: define.c:111
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:268
void pfree(void *pointer)
Definition: mcxt.c:950
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:78
TupleDesc tupledesc
Definition: walreceiver.h:190
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
Oid CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:162
#define Natts_pg_subscription
#define Anum_pg_subscription_subname
char * defGetString(DefElem *def)
Definition: define.c:49
ItemPointerData t_self
Definition: htup.h:65
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:157
#define SubscriptionRelationId
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3033
#define NoLock
Definition: lockdefs.h:34
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:216
AlterSubscriptionType kind
Definition: parsenodes.h:3412
void aclcheck_error(AclResult aclerr, AclObjectKind objectkind, const char *objectname)
Definition: aclchk.c:3399
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:447
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:873
List * GetSubscriptionRelations(Oid subid)
#define CStringGetDatum(X)
Definition: postgres.h:584
List * publications
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc)
Definition: execTuples.c:199
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
int synchronous_commit
Definition: xact.c:82
#define ereport(elevel, rest)
Definition: elog.h:122
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:163
bool superuser_arg(Oid roleid)
Definition: superuser.c:57
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:823
#define Anum_pg_subscription_subslotname
List * lappend(List *list, void *datum)
Definition: list.c:128
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:169
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define Anum_pg_subscription_subdbid
#define WARNING
Definition: elog.h:40
#define TextDatumGetCString(d)
Definition: builtins.h:92
Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool update_only)
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
uintptr_t Datum
Definition: postgres.h:372
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1279
Oid MyDatabaseId
Definition: globals.c:77
Relation heap_open(Oid relationId, LOCKMODE lockmode)
Definition: heapam.c:1290
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:871
#define BoolGetDatum(X)
Definition: postgres.h:408
#define InvalidOid
Definition: postgres_ext.h:36
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define NOTICE
Definition: elog.h:37
#define Anum_pg_subscription_subconninfo
#define PG_CATCH()
Definition: elog.h:293
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
Tuplestorestate * tuplestore
Definition: walreceiver.h:189
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:676
#define lfirst(lc)
Definition: pg_list.h:106
WalRcvExecStatus status
Definition: walreceiver.h:187
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:137
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:210
static int list_length(const List *l)
Definition: pg_list.h:89
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1111
#define PG_RE_THROW()
Definition: elog.h:314
#define walrcv_disconnect(conn)
Definition: walreceiver.h:264
const char * name
Definition: encode.c:521
#define InvalidRepOriginId
Definition: origin.h:34
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
#define Anum_pg_subscription_subenabled
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:165
#define AccessExclusiveLock
Definition: lockdefs.h:45
void * palloc(Size size)
Definition: mcxt.c:849
int errmsg(const char *fmt,...)
Definition: elog.c:797
void list_free(List *list)
Definition: list.c:1133
#define ALLOCSET_DEFAULT_INITSIZE
Definition: memutils.h:163
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: heaptuple.c:1141
#define NameStr(name)
Definition: c.h:499
#define CStringGetTextDatum(s)
Definition: builtins.h:91
Definition: c.h:439
char * defname
Definition: parsenodes.h:719
#define ALLOCSET_DEFAULT_MAXSIZE
Definition: memutils.h:164
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
Definition: launcher.c:552
#define elog
Definition: elog.h:219
#define HeapTupleGetOid(tuple)
Definition: htup_details.h:695
#define qsort(a, b, c, d)
Definition: port.h:443
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:856
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:167
#define PG_TRY()
Definition: elog.h:284
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:791
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:5919
Definition: pg_list.h:45
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1726
#define SUBREL_STATE_READY
#define Anum_pg_subscription_subsynccommit
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:419
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh)
#define PG_END_TRY()
Definition: elog.h:300
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:262
void PreventTransactionChain(bool isTopLevel, const char *stmtType)
Definition: xact.c:3157
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5110
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:242
#define SearchSysCache2(cacheId, key1, key2)
Definition: syscache.h:158