PostgreSQL Source Code  git master
subscriptioncmds.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * subscriptioncmds.c
4  * subscription catalog manipulation functions
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * subscriptioncmds.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/table.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/dependency.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
28 #include "catalog/pg_type.h"
29 #include "commands/defrem.h"
30 #include "commands/event_trigger.h"
32 #include "executor/executor.h"
33 #include "miscadmin.h"
34 #include "nodes/makefuncs.h"
36 #include "replication/origin.h"
38 #include "replication/walsender.h"
40 #include "storage/lmgr.h"
41 #include "utils/acl.h"
42 #include "utils/builtins.h"
43 #include "utils/guc.h"
44 #include "utils/lsyscache.h"
45 #include "utils/memutils.h"
46 #include "utils/syscache.h"
47 
48 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
49 
50 /*
51  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
52  *
53  * Since not all options can be specified in both commands, this function
54  * will report an error on options if the target output pointer is NULL to
55  * accommodate that.
56  */
57 static void
59  bool *connect,
60  bool *enabled_given, bool *enabled,
61  bool *create_slot,
62  bool *slot_name_given, char **slot_name,
63  bool *copy_data,
64  char **synchronous_commit,
65  bool *refresh,
66  bool *binary_given, bool *binary,
67  bool *streaming_given, bool *streaming)
68 {
69  ListCell *lc;
70  bool connect_given = false;
71  bool create_slot_given = false;
72  bool copy_data_given = false;
73  bool refresh_given = false;
74 
75  /* If connect is specified, the others also need to be. */
76  Assert(!connect || (enabled && create_slot && copy_data));
77 
78  if (connect)
79  *connect = true;
80  if (enabled)
81  {
82  *enabled_given = false;
83  *enabled = true;
84  }
85  if (create_slot)
86  *create_slot = true;
87  if (slot_name)
88  {
89  *slot_name_given = false;
90  *slot_name = NULL;
91  }
92  if (copy_data)
93  *copy_data = true;
94  if (synchronous_commit)
95  *synchronous_commit = NULL;
96  if (refresh)
97  *refresh = true;
98  if (binary)
99  {
100  *binary_given = false;
101  *binary = false;
102  }
103  if (streaming)
104  {
105  *streaming_given = false;
106  *streaming = false;
107  }
108 
109  /* Parse options */
110  foreach(lc, options)
111  {
112  DefElem *defel = (DefElem *) lfirst(lc);
113 
114  if (strcmp(defel->defname, "connect") == 0 && connect)
115  {
116  if (connect_given)
117  ereport(ERROR,
118  (errcode(ERRCODE_SYNTAX_ERROR),
119  errmsg("conflicting or redundant options")));
120 
121  connect_given = true;
122  *connect = defGetBoolean(defel);
123  }
124  else if (strcmp(defel->defname, "enabled") == 0 && enabled)
125  {
126  if (*enabled_given)
127  ereport(ERROR,
128  (errcode(ERRCODE_SYNTAX_ERROR),
129  errmsg("conflicting or redundant options")));
130 
131  *enabled_given = true;
132  *enabled = defGetBoolean(defel);
133  }
134  else if (strcmp(defel->defname, "create_slot") == 0 && create_slot)
135  {
136  if (create_slot_given)
137  ereport(ERROR,
138  (errcode(ERRCODE_SYNTAX_ERROR),
139  errmsg("conflicting or redundant options")));
140 
141  create_slot_given = true;
142  *create_slot = defGetBoolean(defel);
143  }
144  else if (strcmp(defel->defname, "slot_name") == 0 && slot_name)
145  {
146  if (*slot_name_given)
147  ereport(ERROR,
148  (errcode(ERRCODE_SYNTAX_ERROR),
149  errmsg("conflicting or redundant options")));
150 
151  *slot_name_given = true;
152  *slot_name = defGetString(defel);
153 
154  /* Setting slot_name = NONE is treated as no slot name. */
155  if (strcmp(*slot_name, "none") == 0)
156  *slot_name = NULL;
157  }
158  else if (strcmp(defel->defname, "copy_data") == 0 && copy_data)
159  {
160  if (copy_data_given)
161  ereport(ERROR,
162  (errcode(ERRCODE_SYNTAX_ERROR),
163  errmsg("conflicting or redundant options")));
164 
165  copy_data_given = true;
166  *copy_data = defGetBoolean(defel);
167  }
168  else if (strcmp(defel->defname, "synchronous_commit") == 0 &&
169  synchronous_commit)
170  {
171  if (*synchronous_commit)
172  ereport(ERROR,
173  (errcode(ERRCODE_SYNTAX_ERROR),
174  errmsg("conflicting or redundant options")));
175 
176  *synchronous_commit = defGetString(defel);
177 
178  /* Test if the given value is valid for synchronous_commit GUC. */
179  (void) set_config_option("synchronous_commit", *synchronous_commit,
181  false, 0, false);
182  }
183  else if (strcmp(defel->defname, "refresh") == 0 && refresh)
184  {
185  if (refresh_given)
186  ereport(ERROR,
187  (errcode(ERRCODE_SYNTAX_ERROR),
188  errmsg("conflicting or redundant options")));
189 
190  refresh_given = true;
191  *refresh = defGetBoolean(defel);
192  }
193  else if (strcmp(defel->defname, "binary") == 0 && binary)
194  {
195  if (*binary_given)
196  ereport(ERROR,
197  (errcode(ERRCODE_SYNTAX_ERROR),
198  errmsg("conflicting or redundant options")));
199 
200  *binary_given = true;
201  *binary = defGetBoolean(defel);
202  }
203  else if (strcmp(defel->defname, "streaming") == 0 && streaming)
204  {
205  if (*streaming_given)
206  ereport(ERROR,
207  (errcode(ERRCODE_SYNTAX_ERROR),
208  errmsg("conflicting or redundant options")));
209 
210  *streaming_given = true;
211  *streaming = defGetBoolean(defel);
212  }
213  else
214  ereport(ERROR,
215  (errcode(ERRCODE_SYNTAX_ERROR),
216  errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
217  }
218 
219  /*
220  * We've been explicitly asked to not connect, that requires some
221  * additional processing.
222  */
223  if (connect && !*connect)
224  {
225  /* Check for incompatible options from the user. */
226  if (enabled && *enabled_given && *enabled)
227  ereport(ERROR,
228  (errcode(ERRCODE_SYNTAX_ERROR),
229  /*- translator: both %s are strings of the form "option = value" */
230  errmsg("%s and %s are mutually exclusive options",
231  "connect = false", "enabled = true")));
232 
233  if (create_slot && create_slot_given && *create_slot)
234  ereport(ERROR,
235  (errcode(ERRCODE_SYNTAX_ERROR),
236  errmsg("%s and %s are mutually exclusive options",
237  "connect = false", "create_slot = true")));
238 
239  if (copy_data && copy_data_given && *copy_data)
240  ereport(ERROR,
241  (errcode(ERRCODE_SYNTAX_ERROR),
242  errmsg("%s and %s are mutually exclusive options",
243  "connect = false", "copy_data = true")));
244 
245  /* Change the defaults of other options. */
246  *enabled = false;
247  *create_slot = false;
248  *copy_data = false;
249  }
250 
251  /*
252  * Do additional checking for disallowed combination when slot_name = NONE
253  * was used.
254  */
255  if (slot_name && *slot_name_given && !*slot_name)
256  {
257  if (enabled && *enabled_given && *enabled)
258  ereport(ERROR,
259  (errcode(ERRCODE_SYNTAX_ERROR),
260  /*- translator: both %s are strings of the form "option = value" */
261  errmsg("%s and %s are mutually exclusive options",
262  "slot_name = NONE", "enabled = true")));
263 
264  if (create_slot && create_slot_given && *create_slot)
265  ereport(ERROR,
266  (errcode(ERRCODE_SYNTAX_ERROR),
267  errmsg("%s and %s are mutually exclusive options",
268  "slot_name = NONE", "create_slot = true")));
269 
270  if (enabled && !*enabled_given && *enabled)
271  ereport(ERROR,
272  (errcode(ERRCODE_SYNTAX_ERROR),
273  /*- translator: both %s are strings of the form "option = value" */
274  errmsg("subscription with %s must also set %s",
275  "slot_name = NONE", "enabled = false")));
276 
277  if (create_slot && !create_slot_given && *create_slot)
278  ereport(ERROR,
279  (errcode(ERRCODE_SYNTAX_ERROR),
280  errmsg("subscription with %s must also set %s",
281  "slot_name = NONE", "create_slot = false")));
282  }
283 }
284 
285 /*
286  * Auxiliary function to build a text array out of a list of String nodes.
287  */
288 static Datum
290 {
291  ArrayType *arr;
292  Datum *datums;
293  int j = 0;
294  ListCell *cell;
295  MemoryContext memcxt;
296  MemoryContext oldcxt;
297 
298  /* Create memory context for temporary allocations. */
300  "publicationListToArray to array",
302  oldcxt = MemoryContextSwitchTo(memcxt);
303 
304  datums = (Datum *) palloc(sizeof(Datum) * list_length(publist));
305 
306  foreach(cell, publist)
307  {
308  char *name = strVal(lfirst(cell));
309  ListCell *pcell;
310 
311  /* Check for duplicates. */
312  foreach(pcell, publist)
313  {
314  char *pname = strVal(lfirst(pcell));
315 
316  if (pcell == cell)
317  break;
318 
319  if (strcmp(name, pname) == 0)
320  ereport(ERROR,
321  (errcode(ERRCODE_SYNTAX_ERROR),
322  errmsg("publication name \"%s\" used more than once",
323  pname)));
324  }
325 
326  datums[j++] = CStringGetTextDatum(name);
327  }
328 
329  MemoryContextSwitchTo(oldcxt);
330 
331  arr = construct_array(datums, list_length(publist),
332  TEXTOID, -1, false, TYPALIGN_INT);
333 
334  MemoryContextDelete(memcxt);
335 
336  return PointerGetDatum(arr);
337 }
338 
339 /*
340  * Create new subscription.
341  */
344 {
345  Relation rel;
346  ObjectAddress myself;
347  Oid subid;
348  bool nulls[Natts_pg_subscription];
349  Datum values[Natts_pg_subscription];
350  Oid owner = GetUserId();
351  HeapTuple tup;
352  bool connect;
353  bool enabled_given;
354  bool enabled;
355  bool copy_data;
356  bool streaming;
357  bool streaming_given;
358  char *synchronous_commit;
359  char *conninfo;
360  char *slotname;
361  bool slotname_given;
362  bool binary;
363  bool binary_given;
364  char originname[NAMEDATALEN];
365  bool create_slot;
366  List *publications;
367 
368  /*
369  * Parse and check options.
370  *
371  * Connection and publication should not be specified here.
372  */
374  &connect,
375  &enabled_given, &enabled,
376  &create_slot,
377  &slotname_given, &slotname,
378  &copy_data,
379  &synchronous_commit,
380  NULL, /* no "refresh" */
381  &binary_given, &binary,
382  &streaming_given, &streaming);
383 
384  /*
385  * Since creating a replication slot is not transactional, rolling back
386  * the transaction leaves the created replication slot. So we cannot run
387  * CREATE SUBSCRIPTION inside a transaction block if creating a
388  * replication slot.
389  */
390  if (create_slot)
391  PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
392 
393  if (!superuser())
394  ereport(ERROR,
395  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
396  errmsg("must be superuser to create subscriptions")));
397 
398  /*
399  * If built with appropriate switch, whine when regression-testing
400  * conventions for subscription names are violated.
401  */
402 #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
403  if (strncmp(stmt->subname, "regress_", 8) != 0)
404  elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
405 #endif
406 
407  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
408 
409  /* Check if name is used */
410  subid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
412  if (OidIsValid(subid))
413  {
414  ereport(ERROR,
416  errmsg("subscription \"%s\" already exists",
417  stmt->subname)));
418  }
419 
420  if (!slotname_given && slotname == NULL)
421  slotname = stmt->subname;
422 
423  /* The default for synchronous_commit of subscriptions is off. */
424  if (synchronous_commit == NULL)
425  synchronous_commit = "off";
426 
427  conninfo = stmt->conninfo;
428  publications = stmt->publication;
429 
430  /* Load the library providing us libpq calls. */
431  load_file("libpqwalreceiver", false);
432 
433  /* Check the connection info string. */
434  walrcv_check_conninfo(conninfo);
435 
436  /* Everything ok, form a new tuple. */
437  memset(values, 0, sizeof(values));
438  memset(nulls, false, sizeof(nulls));
439 
441  Anum_pg_subscription_oid);
442  values[Anum_pg_subscription_oid - 1] = ObjectIdGetDatum(subid);
443  values[Anum_pg_subscription_subdbid - 1] = ObjectIdGetDatum(MyDatabaseId);
444  values[Anum_pg_subscription_subname - 1] =
446  values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
447  values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
448  values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
449  values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
450  values[Anum_pg_subscription_subconninfo - 1] =
451  CStringGetTextDatum(conninfo);
452  if (slotname)
453  values[Anum_pg_subscription_subslotname - 1] =
455  else
456  nulls[Anum_pg_subscription_subslotname - 1] = true;
457  values[Anum_pg_subscription_subsynccommit - 1] =
458  CStringGetTextDatum(synchronous_commit);
459  values[Anum_pg_subscription_subpublications - 1] =
460  publicationListToArray(publications);
461 
462  tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
463 
464  /* Insert tuple into catalog. */
465  CatalogTupleInsert(rel, tup);
466  heap_freetuple(tup);
467 
468  recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
469 
470  snprintf(originname, sizeof(originname), "pg_%u", subid);
471  replorigin_create(originname);
472 
473  /*
474  * Connect to remote side to execute requested commands and fetch table
475  * info.
476  */
477  if (connect)
478  {
479  char *err;
481  List *tables;
482  ListCell *lc;
483  char table_state;
484 
485  /* Try to connect to the publisher. */
486  wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
487  if (!wrconn)
488  ereport(ERROR,
489  (errmsg("could not connect to the publisher: %s", err)));
490 
491  PG_TRY();
492  {
493  /*
494  * Set sync state based on if we were asked to do data copy or
495  * not.
496  */
497  table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
498 
499  /*
500  * Get the table list from publisher and build local table status
501  * info.
502  */
503  tables = fetch_table_list(wrconn, publications);
504  foreach(lc, tables)
505  {
506  RangeVar *rv = (RangeVar *) lfirst(lc);
507  Oid relid;
508 
509  relid = RangeVarGetRelid(rv, AccessShareLock, false);
510 
511  /* Check for supported relkind. */
513  rv->schemaname, rv->relname);
514 
515  AddSubscriptionRelState(subid, relid, table_state,
517  }
518 
519  /*
520  * If requested, create permanent slot for the subscription. We
521  * won't use the initial snapshot for anything, so no need to
522  * export it.
523  */
524  if (create_slot)
525  {
526  Assert(slotname);
527 
528  walrcv_create_slot(wrconn, slotname, false,
529  CRS_NOEXPORT_SNAPSHOT, NULL);
530  ereport(NOTICE,
531  (errmsg("created replication slot \"%s\" on publisher",
532  slotname)));
533  }
534  }
535  PG_FINALLY();
536  {
537  walrcv_disconnect(wrconn);
538  }
539  PG_END_TRY();
540  }
541  else
543  /* translator: %s is an SQL ALTER statement */
544  (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
545  "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
546 
548 
549  if (enabled)
551 
552  ObjectAddressSet(myself, SubscriptionRelationId, subid);
553 
554  InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
555 
556  return myself;
557 }
558 
559 static void
561 {
562  char *err;
563  List *pubrel_names;
564  List *subrel_states;
565  Oid *subrel_local_oids;
566  Oid *pubrel_local_oids;
567  ListCell *lc;
568  int off;
569 
570  /* Load the library providing us libpq calls. */
571  load_file("libpqwalreceiver", false);
572 
573  /* Try to connect to the publisher. */
574  wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
575  if (!wrconn)
576  ereport(ERROR,
577  (errmsg("could not connect to the publisher: %s", err)));
578 
579  /* Get the table list from publisher. */
580  pubrel_names = fetch_table_list(wrconn, sub->publications);
581 
582  /* We are done with the remote side, close connection. */
584 
585  /* Get local table list. */
586  subrel_states = GetSubscriptionRelations(sub->oid);
587 
588  /*
589  * Build qsorted array of local table oids for faster lookup. This can
590  * potentially contain all tables in the database so speed of lookup is
591  * important.
592  */
593  subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
594  off = 0;
595  foreach(lc, subrel_states)
596  {
598 
599  subrel_local_oids[off++] = relstate->relid;
600  }
601  qsort(subrel_local_oids, list_length(subrel_states),
602  sizeof(Oid), oid_cmp);
603 
604  /*
605  * Walk over the remote tables and try to match them to locally known
606  * tables. If the table is not known locally create a new state for it.
607  *
608  * Also builds array of local oids of remote tables for the next step.
609  */
610  off = 0;
611  pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid));
612 
613  foreach(lc, pubrel_names)
614  {
615  RangeVar *rv = (RangeVar *) lfirst(lc);
616  Oid relid;
617 
618  relid = RangeVarGetRelid(rv, AccessShareLock, false);
619 
620  /* Check for supported relkind. */
622  rv->schemaname, rv->relname);
623 
624  pubrel_local_oids[off++] = relid;
625 
626  if (!bsearch(&relid, subrel_local_oids,
627  list_length(subrel_states), sizeof(Oid), oid_cmp))
628  {
629  AddSubscriptionRelState(sub->oid, relid,
630  copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
632  ereport(DEBUG1,
633  (errmsg("table \"%s.%s\" added to subscription \"%s\"",
634  rv->schemaname, rv->relname, sub->name)));
635  }
636  }
637 
638  /*
639  * Next remove state for tables we should not care about anymore using the
640  * data we collected above
641  */
642  qsort(pubrel_local_oids, list_length(pubrel_names),
643  sizeof(Oid), oid_cmp);
644 
645  for (off = 0; off < list_length(subrel_states); off++)
646  {
647  Oid relid = subrel_local_oids[off];
648 
649  if (!bsearch(&relid, pubrel_local_oids,
650  list_length(pubrel_names), sizeof(Oid), oid_cmp))
651  {
652  RemoveSubscriptionRel(sub->oid, relid);
653 
655 
656  ereport(DEBUG1,
657  (errmsg("table \"%s.%s\" removed from subscription \"%s\"",
659  get_rel_name(relid),
660  sub->name)));
661  }
662  }
663 }
664 
665 /*
666  * Alter the existing subscription.
667  */
670 {
671  Relation rel;
672  ObjectAddress myself;
673  bool nulls[Natts_pg_subscription];
674  bool replaces[Natts_pg_subscription];
675  Datum values[Natts_pg_subscription];
676  HeapTuple tup;
677  Oid subid;
678  bool update_tuple = false;
679  Subscription *sub;
681 
682  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
683 
684  /* Fetch the existing tuple. */
686  CStringGetDatum(stmt->subname));
687 
688  if (!HeapTupleIsValid(tup))
689  ereport(ERROR,
690  (errcode(ERRCODE_UNDEFINED_OBJECT),
691  errmsg("subscription \"%s\" does not exist",
692  stmt->subname)));
693 
694  form = (Form_pg_subscription) GETSTRUCT(tup);
695  subid = form->oid;
696 
697  /* must be owner */
698  if (!pg_subscription_ownercheck(subid, GetUserId()))
700  stmt->subname);
701 
702  sub = GetSubscription(subid, false);
703 
704  /* Lock the subscription so nobody else can do anything with it. */
705  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
706 
707  /* Form a new tuple. */
708  memset(values, 0, sizeof(values));
709  memset(nulls, false, sizeof(nulls));
710  memset(replaces, false, sizeof(replaces));
711 
712  switch (stmt->kind)
713  {
715  {
716  char *slotname;
717  bool slotname_given;
718  char *synchronous_commit;
719  bool binary_given;
720  bool binary;
721  bool streaming_given;
722  bool streaming;
723 
725  NULL, /* no "connect" */
726  NULL, NULL, /* no "enabled" */
727  NULL, /* no "create_slot" */
728  &slotname_given, &slotname,
729  NULL, /* no "copy_data" */
730  &synchronous_commit,
731  NULL, /* no "refresh" */
732  &binary_given, &binary,
733  &streaming_given, &streaming);
734 
735  if (slotname_given)
736  {
737  if (sub->enabled && !slotname)
738  ereport(ERROR,
739  (errcode(ERRCODE_SYNTAX_ERROR),
740  errmsg("cannot set %s for enabled subscription",
741  "slot_name = NONE")));
742 
743  if (slotname)
744  values[Anum_pg_subscription_subslotname - 1] =
746  else
747  nulls[Anum_pg_subscription_subslotname - 1] = true;
748  replaces[Anum_pg_subscription_subslotname - 1] = true;
749  }
750 
751  if (synchronous_commit)
752  {
753  values[Anum_pg_subscription_subsynccommit - 1] =
754  CStringGetTextDatum(synchronous_commit);
755  replaces[Anum_pg_subscription_subsynccommit - 1] = true;
756  }
757 
758  if (binary_given)
759  {
760  values[Anum_pg_subscription_subbinary - 1] =
761  BoolGetDatum(binary);
762  replaces[Anum_pg_subscription_subbinary - 1] = true;
763  }
764 
765  if (streaming_given)
766  {
767  values[Anum_pg_subscription_substream - 1] =
768  BoolGetDatum(streaming);
769  replaces[Anum_pg_subscription_substream - 1] = true;
770  }
771 
772  update_tuple = true;
773  break;
774  }
775 
777  {
778  bool enabled,
779  enabled_given;
780 
782  NULL, /* no "connect" */
783  &enabled_given, &enabled,
784  NULL, /* no "create_slot" */
785  NULL, NULL, /* no "slot_name" */
786  NULL, /* no "copy_data" */
787  NULL, /* no "synchronous_commit" */
788  NULL, /* no "refresh" */
789  NULL, NULL, /* no "binary" */
790  NULL, NULL); /* no streaming */
791  Assert(enabled_given);
792 
793  if (!sub->slotname && enabled)
794  ereport(ERROR,
795  (errcode(ERRCODE_SYNTAX_ERROR),
796  errmsg("cannot enable subscription that does not have a slot name")));
797 
798  values[Anum_pg_subscription_subenabled - 1] =
799  BoolGetDatum(enabled);
800  replaces[Anum_pg_subscription_subenabled - 1] = true;
801 
802  if (enabled)
804 
805  update_tuple = true;
806  break;
807  }
808 
810  /* Load the library providing us libpq calls. */
811  load_file("libpqwalreceiver", false);
812  /* Check the connection info string. */
814 
815  values[Anum_pg_subscription_subconninfo - 1] =
817  replaces[Anum_pg_subscription_subconninfo - 1] = true;
818  update_tuple = true;
819  break;
820 
822  {
823  bool copy_data;
824  bool refresh;
825 
827  NULL, /* no "connect" */
828  NULL, NULL, /* no "enabled" */
829  NULL, /* no "create_slot" */
830  NULL, NULL, /* no "slot_name" */
831  &copy_data,
832  NULL, /* no "synchronous_commit" */
833  &refresh,
834  NULL, NULL, /* no "binary" */
835  NULL, NULL); /* no "streaming" */
836  values[Anum_pg_subscription_subpublications - 1] =
838  replaces[Anum_pg_subscription_subpublications - 1] = true;
839 
840  update_tuple = true;
841 
842  /* Refresh if user asked us to. */
843  if (refresh)
844  {
845  if (!sub->enabled)
846  ereport(ERROR,
847  (errcode(ERRCODE_SYNTAX_ERROR),
848  errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
849  errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
850 
851  /* Make sure refresh sees the new list of publications. */
852  sub->publications = stmt->publication;
853 
854  AlterSubscription_refresh(sub, copy_data);
855  }
856 
857  break;
858  }
859 
861  {
862  bool copy_data;
863 
864  if (!sub->enabled)
865  ereport(ERROR,
866  (errcode(ERRCODE_SYNTAX_ERROR),
867  errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
868 
870  NULL, /* no "connect" */
871  NULL, NULL, /* no "enabled" */
872  NULL, /* no "create_slot" */
873  NULL, NULL, /* no "slot_name" */
874  &copy_data,
875  NULL, /* no "synchronous_commit" */
876  NULL, /* no "refresh" */
877  NULL, NULL, /* no "binary" */
878  NULL, NULL); /* no "streaming" */
879 
880  AlterSubscription_refresh(sub, copy_data);
881 
882  break;
883  }
884 
885  default:
886  elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
887  stmt->kind);
888  }
889 
890  /* Update the catalog if needed. */
891  if (update_tuple)
892  {
893  tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
894  replaces);
895 
896  CatalogTupleUpdate(rel, &tup->t_self, tup);
897 
898  heap_freetuple(tup);
899  }
900 
902 
903  ObjectAddressSet(myself, SubscriptionRelationId, subid);
904 
905  InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
906 
907  return myself;
908 }
909 
910 /*
911  * Drop a subscription
912  */
913 void
914 DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
915 {
916  Relation rel;
917  ObjectAddress myself;
918  HeapTuple tup;
919  Oid subid;
920  Datum datum;
921  bool isnull;
922  char *subname;
923  char *conninfo;
924  char *slotname;
925  List *subworkers;
926  ListCell *lc;
927  char originname[NAMEDATALEN];
928  char *err = NULL;
929  RepOriginId originid;
930  WalReceiverConn *wrconn = NULL;
931  StringInfoData cmd;
933 
934  /*
935  * Lock pg_subscription with AccessExclusiveLock to ensure that the
936  * launcher doesn't restart new worker during dropping the subscription
937  */
938  rel = table_open(SubscriptionRelationId, AccessExclusiveLock);
939 
941  CStringGetDatum(stmt->subname));
942 
943  if (!HeapTupleIsValid(tup))
944  {
945  table_close(rel, NoLock);
946 
947  if (!stmt->missing_ok)
948  ereport(ERROR,
949  (errcode(ERRCODE_UNDEFINED_OBJECT),
950  errmsg("subscription \"%s\" does not exist",
951  stmt->subname)));
952  else
953  ereport(NOTICE,
954  (errmsg("subscription \"%s\" does not exist, skipping",
955  stmt->subname)));
956 
957  return;
958  }
959 
960  form = (Form_pg_subscription) GETSTRUCT(tup);
961  subid = form->oid;
962 
963  /* must be owner */
964  if (!pg_subscription_ownercheck(subid, GetUserId()))
966  stmt->subname);
967 
968  /* DROP hook for the subscription being removed */
969  InvokeObjectDropHook(SubscriptionRelationId, subid, 0);
970 
971  /*
972  * Lock the subscription so nobody else can do anything with it (including
973  * the replication workers).
974  */
975  LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock);
976 
977  /* Get subname */
978  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
979  Anum_pg_subscription_subname, &isnull);
980  Assert(!isnull);
981  subname = pstrdup(NameStr(*DatumGetName(datum)));
982 
983  /* Get conninfo */
984  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
985  Anum_pg_subscription_subconninfo, &isnull);
986  Assert(!isnull);
987  conninfo = TextDatumGetCString(datum);
988 
989  /* Get slotname */
990  datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
991  Anum_pg_subscription_subslotname, &isnull);
992  if (!isnull)
993  slotname = pstrdup(NameStr(*DatumGetName(datum)));
994  else
995  slotname = NULL;
996 
997  /*
998  * Since dropping a replication slot is not transactional, the replication
999  * slot stays dropped even if the transaction rolls back. So we cannot
1000  * run DROP SUBSCRIPTION inside a transaction block if dropping the
1001  * replication slot.
1002  *
1003  * XXX The command name should really be something like "DROP SUBSCRIPTION
1004  * of a subscription that is associated with a replication slot", but we
1005  * don't have the proper facilities for that.
1006  */
1007  if (slotname)
1008  PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
1009 
1010  ObjectAddressSet(myself, SubscriptionRelationId, subid);
1011  EventTriggerSQLDropAddObject(&myself, true, true);
1012 
1013  /* Remove the tuple from catalog. */
1014  CatalogTupleDelete(rel, &tup->t_self);
1015 
1016  ReleaseSysCache(tup);
1017 
1018  /*
1019  * Stop all the subscription workers immediately.
1020  *
1021  * This is necessary if we are dropping the replication slot, so that the
1022  * slot becomes accessible.
1023  *
1024  * It is also necessary if the subscription is disabled and was disabled
1025  * in the same transaction. Then the workers haven't seen the disabling
1026  * yet and will still be running, leading to hangs later when we want to
1027  * drop the replication origin. If the subscription was disabled before
1028  * this transaction, then there shouldn't be any workers left, so this
1029  * won't make a difference.
1030  *
1031  * New workers won't be started because we hold an exclusive lock on the
1032  * subscription till the end of the transaction.
1033  */
1034  LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1035  subworkers = logicalrep_workers_find(subid, false);
1036  LWLockRelease(LogicalRepWorkerLock);
1037  foreach(lc, subworkers)
1038  {
1040 
1042  }
1043  list_free(subworkers);
1044 
1045  /* Clean up dependencies */
1046  deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
1047 
1048  /* Remove any associated relation synchronization states. */
1050 
1051  /* Remove the origin tracking if exists. */
1052  snprintf(originname, sizeof(originname), "pg_%u", subid);
1053  originid = replorigin_by_name(originname, true);
1054  if (originid != InvalidRepOriginId)
1055  replorigin_drop(originid, false);
1056 
1057  /*
1058  * If there is no slot associated with the subscription, we can finish
1059  * here.
1060  */
1061  if (!slotname)
1062  {
1063  table_close(rel, NoLock);
1064  return;
1065  }
1066 
1067  /*
1068  * Otherwise drop the replication slot at the publisher node using the
1069  * replication connection.
1070  */
1071  load_file("libpqwalreceiver", false);
1072 
1073  initStringInfo(&cmd);
1074  appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
1075 
1076  wrconn = walrcv_connect(conninfo, true, subname, &err);
1077  if (wrconn == NULL)
1078  ereport(ERROR,
1079  (errmsg("could not connect to publisher when attempting to "
1080  "drop the replication slot \"%s\"", slotname),
1081  errdetail("The error was: %s", err),
1082  /* translator: %s is an SQL ALTER command */
1083  errhint("Use %s to disassociate the subscription from the slot.",
1084  "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
1085 
1086  PG_TRY();
1087  {
1088  WalRcvExecResult *res;
1089 
1090  res = walrcv_exec(wrconn, cmd.data, 0, NULL);
1091 
1092  if (res->status != WALRCV_OK_COMMAND)
1093  ereport(ERROR,
1094  (errmsg("could not drop the replication slot \"%s\" on publisher",
1095  slotname),
1096  errdetail("The error was: %s", res->err)));
1097  else
1098  ereport(NOTICE,
1099  (errmsg("dropped replication slot \"%s\" on publisher",
1100  slotname)));
1101 
1102  walrcv_clear_result(res);
1103  }
1104  PG_FINALLY();
1105  {
1106  walrcv_disconnect(wrconn);
1107  }
1108  PG_END_TRY();
1109 
1110  pfree(cmd.data);
1111 
1112  table_close(rel, NoLock);
1113 }
1114 
1115 /*
1116  * Internal workhorse for changing a subscription owner
1117  */
1118 static void
1120 {
1121  Form_pg_subscription form;
1122 
1123  form = (Form_pg_subscription) GETSTRUCT(tup);
1124 
1125  if (form->subowner == newOwnerId)
1126  return;
1127 
1128  if (!pg_subscription_ownercheck(form->oid, GetUserId()))
1130  NameStr(form->subname));
1131 
1132  /* New owner must be a superuser */
1133  if (!superuser_arg(newOwnerId))
1134  ereport(ERROR,
1135  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1136  errmsg("permission denied to change owner of subscription \"%s\"",
1137  NameStr(form->subname)),
1138  errhint("The owner of a subscription must be a superuser.")));
1139 
1140  form->subowner = newOwnerId;
1141  CatalogTupleUpdate(rel, &tup->t_self, tup);
1142 
1143  /* Update owner dependency reference */
1144  changeDependencyOnOwner(SubscriptionRelationId,
1145  form->oid,
1146  newOwnerId);
1147 
1148  InvokeObjectPostAlterHook(SubscriptionRelationId,
1149  form->oid, 0);
1150 }
1151 
1152 /*
1153  * Change subscription owner -- by name
1154  */
1156 AlterSubscriptionOwner(const char *name, Oid newOwnerId)
1157 {
1158  Oid subid;
1159  HeapTuple tup;
1160  Relation rel;
1161  ObjectAddress address;
1162  Form_pg_subscription form;
1163 
1164  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1165 
1167  CStringGetDatum(name));
1168 
1169  if (!HeapTupleIsValid(tup))
1170  ereport(ERROR,
1171  (errcode(ERRCODE_UNDEFINED_OBJECT),
1172  errmsg("subscription \"%s\" does not exist", name)));
1173 
1174  form = (Form_pg_subscription) GETSTRUCT(tup);
1175  subid = form->oid;
1176 
1177  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1178 
1179  ObjectAddressSet(address, SubscriptionRelationId, subid);
1180 
1181  heap_freetuple(tup);
1182 
1184 
1185  return address;
1186 }
1187 
1188 /*
1189  * Change subscription owner -- by OID
1190  */
1191 void
1193 {
1194  HeapTuple tup;
1195  Relation rel;
1196 
1197  rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1198 
1200 
1201  if (!HeapTupleIsValid(tup))
1202  ereport(ERROR,
1203  (errcode(ERRCODE_UNDEFINED_OBJECT),
1204  errmsg("subscription with OID %u does not exist", subid)));
1205 
1206  AlterSubscriptionOwner_internal(rel, tup, newOwnerId);
1207 
1208  heap_freetuple(tup);
1209 
1211 }
1212 
1213 /*
1214  * Get the list of tables which belong to specified publications on the
1215  * publisher connection.
1216  */
1217 static List *
1219 {
1220  WalRcvExecResult *res;
1221  StringInfoData cmd;
1222  TupleTableSlot *slot;
1223  Oid tableRow[2] = {TEXTOID, TEXTOID};
1224  ListCell *lc;
1225  bool first;
1226  List *tablelist = NIL;
1227 
1228  Assert(list_length(publications) > 0);
1229 
1230  initStringInfo(&cmd);
1231  appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
1232  " FROM pg_catalog.pg_publication_tables t\n"
1233  " WHERE t.pubname IN (");
1234  first = true;
1235  foreach(lc, publications)
1236  {
1237  char *pubname = strVal(lfirst(lc));
1238 
1239  if (first)
1240  first = false;
1241  else
1242  appendStringInfoString(&cmd, ", ");
1243 
1245  }
1246  appendStringInfoChar(&cmd, ')');
1247 
1248  res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
1249  pfree(cmd.data);
1250 
1251  if (res->status != WALRCV_OK_TUPLES)
1252  ereport(ERROR,
1253  (errmsg("could not receive list of replicated tables from the publisher: %s",
1254  res->err)));
1255 
1256  /* Process tables. */
1258  while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1259  {
1260  char *nspname;
1261  char *relname;
1262  bool isnull;
1263  RangeVar *rv;
1264 
1265  nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
1266  Assert(!isnull);
1267  relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
1268  Assert(!isnull);
1269 
1270  rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1);
1271  tablelist = lappend(tablelist, rv);
1272 
1273  ExecClearTuple(slot);
1274  }
1276 
1277  walrcv_clear_result(res);
1278 
1279  return tablelist;
1280 }
#define NIL
Definition: pg_list.h:65
static List * fetch_table_list(WalReceiverConn *wrconn, List *publications)
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition: catalog.c:317
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
WalReceiverConn * wrconn
Definition: worker.c:161
ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt)
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
List * logicalrep_workers_find(Oid subid, bool only_running)
Definition: launcher.c:263
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:212
#define AllocSetContextCreate
Definition: memutils.h:170
Datum namein(PG_FUNCTION_ARGS)
Definition: name.c:48
#define DEBUG1
Definition: elog.h:25
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:167
int errhint(const char *fmt,...)
Definition: elog.c:1068
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:102
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:151
void RemoveSubscriptionRel(Oid subid, Oid relid)
const char * quote_identifier(const char *ident)
Definition: ruleutils.c:10709
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition: tuptable.h:425
#define RelationGetDescr(relation)
Definition: rel.h:482
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1208
Oid GetUserId(void)
Definition: miscinit.c:476
#define PointerGetDatum(X)
Definition: postgres.h:556
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition: namespace.h:78
#define walrcv_check_conninfo(conninfo)
Definition: walreceiver.h:400
char get_rel_relkind(Oid relid)
Definition: lsyscache.c:1915
char * pstrdup(const char *in)
Definition: mcxt.c:1187
Oid get_rel_namespace(Oid relid)
Definition: lsyscache.c:1864
void replorigin_drop(RepOriginId roident, bool nowait)
Definition: origin.c:332
#define InvokeObjectDropHook(classId, objectId, subId)
Definition: objectaccess.h:160
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
ArrayType * construct_array(Datum *elems, int nelems, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3313
#define AccessShareLock
Definition: lockdefs.h:36
uint16 RepOriginId
Definition: xlogdefs.h:58
#define strVal(v)
Definition: value.h:54
static bool create_slot
int errcode(int sqlerrcode)
Definition: elog.c:610
bool superuser(void)
Definition: superuser.c:46
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:350
static Datum publicationListToArray(List *publist)
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
#define connect(s, name, namelen)
Definition: win32_port.h:463
#define DirectFunctionCall1(func, arg1)
Definition: fmgr.h:624
FormData_pg_subscription * Form_pg_subscription
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:164
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
NameData relname
Definition: pg_class.h:38
unsigned int Oid
Definition: postgres_ext.h:31
NameData subname
ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
Definition: walreceiver.h:420
#define OidIsValid(objectId)
Definition: c.h:652
RepOriginId replorigin_by_name(char *roname, bool missing_ok)
Definition: origin.c:209
char * schemaname
Definition: primnodes.h:67
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1812
#define NAMEDATALEN
#define DatumGetName(X)
Definition: postgres.h:585
char * relname
Definition: primnodes.h:68
Subscription * GetSubscription(Oid subid, bool missing_ok)
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
Definition: pg_shdepend.c:309
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition: aclchk.c:3294
bool defGetBoolean(DefElem *def)
Definition: define.c:111
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:430
void pfree(void *pointer)
Definition: mcxt.c:1057
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:91
TupleDesc tupledesc
Definition: walreceiver.h:215
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
char * defGetString(DefElem *def)
Definition: define.c:49
ItemPointerData t_self
Definition: htup.h:65
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, bool *copy_data, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, bool *streaming_given, bool *streaming)
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:176
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn)
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3191
#define NoLock
Definition: lockdefs.h:34
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1224
AlterSubscriptionType kind
Definition: parsenodes.h:3561
void logicalrep_worker_stop(Oid subid, Oid relid)
Definition: launcher.c:456
#define RowExclusiveLock
Definition: lockdefs.h:38
int errdetail(const char *fmt,...)
Definition: elog.c:954
List * GetSubscriptionRelations(Oid subid)
#define CStringGetDatum(X)
Definition: postgres.h:578
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3380
List * publications
void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname)
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
RepOriginId replorigin_create(char *roname)
Definition: origin.c:240
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
int synchronous_commit
Definition: xact.c:83
#define InvokeObjectPostAlterHook(classId, objectId, subId)
Definition: objectaccess.h:175
bool superuser_arg(Oid roleid)
Definition: superuser.c:56
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
Definition: pg_shdepend.c:945
List * lappend(List *list, void *datum)
Definition: list.c:321
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:188
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define WARNING
Definition: elog.h:40
#define PG_FINALLY()
Definition: elog.h:312
#define TextDatumGetCString(d)
Definition: builtins.h:87
uintptr_t Datum
Definition: postgres.h:367
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1164
Datum SysCacheGetAttr(int cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition: syscache.c:1377
Oid MyDatabaseId
Definition: globals.c:85
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition: lmgr.c:1017
#define BoolGetDatum(X)
Definition: postgres.h:402
#define InvalidOid
Definition: postgres_ext.h:36
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:381
#define ereport(elevel,...)
Definition: elog.h:144
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
#define NOTICE
Definition: elog.h:37
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1078
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
Tuplestorestate * tuplestore
Definition: walreceiver.h:214
#define Assert(condition)
Definition: c.h:746
#define lfirst(lc)
Definition: pg_list.h:169
WalRcvExecStatus status
Definition: walreceiver.h:212
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition: syscache.h:194
void load_file(const char *filename, bool restricted)
Definition: dfmgr.c:146
int oid_cmp(const void *p1, const void *p2)
Definition: oid.c:336
void CatalogTupleUpdate(Relation heapRel, ItemPointer otid, HeapTuple tup)
Definition: indexing.c:301
static int list_length(const List *l)
Definition: pg_list.h:149
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
HeapTuple SearchSysCache2(int cacheId, Datum key1, Datum key2)
Definition: syscache.c:1127
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1208
#define walrcv_disconnect(conn)
Definition: walreceiver.h:426
const char * name
Definition: encode.c:561
#define InvalidRepOriginId
Definition: origin.h:33
#define ObjectAddressSet(addr, class_id, object_id)
Definition: objectaddress.h:40
static Datum values[MAXATTR]
Definition: bootstrap.c:165
#define SearchSysCacheCopy1(cacheId, key1)
Definition: syscache.h:174
#define AccessExclusiveLock
Definition: lockdefs.h:45
void * palloc(Size size)
Definition: mcxt.c:950
int errmsg(const char *fmt,...)
Definition: elog.c:821
void list_free(List *list)
Definition: list.c:1376
#define elog(elevel,...)
Definition: elog.h:214
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define NameStr(name)
Definition: c.h:623
#define CStringGetTextDatum(s)
Definition: builtins.h:86
char * defname
Definition: parsenodes.h:733
void logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
Definition: launcher.c:553
#define qsort(a, b, c, d)
Definition: port.h:497
void ApplyLauncherWakeupAtCommit(void)
Definition: launcher.c:933
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition: syscache.h:176
#define PG_TRY()
Definition: elog.h:295
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition: guc.c:6949
Definition: pg_list.h:50
char * get_rel_name(Oid relid)
Definition: lsyscache.c:1840
#define snprintf
Definition: port.h:215
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:85
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition: indexing.c:221
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition: makefuncs.c:422
#define PG_END_TRY()
Definition: elog.h:320
static void AlterSubscription_refresh(Subscription *sub, bool copy_data)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:424
bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid)
Definition: aclchk.c:5251
#define SubscriptionObjectIndexId
Definition: indexing.h:369
#define walrcv_connect(conninfo, logical, appname, err)
Definition: walreceiver.h:398