PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * subscriptioncmds.c
4 * subscription catalog manipulation functions
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/subscriptioncmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/htup_details.h"
19#include "access/table.h"
20#include "access/twophase.h"
21#include "access/xact.h"
22#include "catalog/catalog.h"
23#include "catalog/dependency.h"
24#include "catalog/indexing.h"
25#include "catalog/namespace.h"
28#include "catalog/pg_authid_d.h"
29#include "catalog/pg_database_d.h"
33#include "catalog/pg_type.h"
35#include "commands/defrem.h"
38#include "executor/executor.h"
39#include "foreign/foreign.h"
40#include "miscadmin.h"
41#include "nodes/makefuncs.h"
42#include "pgstat.h"
45#include "replication/origin.h"
46#include "replication/slot.h"
50#include "storage/lmgr.h"
51#include "utils/acl.h"
52#include "utils/builtins.h"
53#include "utils/guc.h"
54#include "utils/lsyscache.h"
55#include "utils/memutils.h"
56#include "utils/pg_lsn.h"
57#include "utils/syscache.h"
58
59/*
60 * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
61 * command.
62 */
63#define SUBOPT_CONNECT 0x00000001
64#define SUBOPT_ENABLED 0x00000002
65#define SUBOPT_CREATE_SLOT 0x00000004
66#define SUBOPT_SLOT_NAME 0x00000008
67#define SUBOPT_COPY_DATA 0x00000010
68#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
69#define SUBOPT_REFRESH 0x00000040
70#define SUBOPT_BINARY 0x00000080
71#define SUBOPT_STREAMING 0x00000100
72#define SUBOPT_TWOPHASE_COMMIT 0x00000200
73#define SUBOPT_DISABLE_ON_ERR 0x00000400
74#define SUBOPT_PASSWORD_REQUIRED 0x00000800
75#define SUBOPT_RUN_AS_OWNER 0x00001000
76#define SUBOPT_FAILOVER 0x00002000
77#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
78#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
79#define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
80#define SUBOPT_LSN 0x00020000
81#define SUBOPT_ORIGIN 0x00040000
82
83/* check if the 'val' has 'bits' set */
84#define IsSet(val, bits) (((val) & (bits)) == (bits))
85
86/*
87 * Structure to hold a bitmap representing the user-provided CREATE/ALTER
88 * SUBSCRIPTION command options and the parsed/default values of each of them.
89 */
113
114/*
115 * PublicationRelKind represents a relation included in a publication.
116 * It stores the schema-qualified relation name (rv) and its kind (relkind).
117 */
123
124static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
126 List *publications, bool copydata,
128 char *origin,
130 int subrel_count, char *subname);
132 List *publications,
133 bool copydata, char *origin,
135 int subrel_count,
136 char *subname);
138static void check_duplicates_in_publist(List *publist, Datum *datums);
139static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
140static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
141static void CheckAlterSubOption(Subscription *sub, const char *option,
142 bool slot_needs_update, bool isTopLevel);
143
144
145/*
146 * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
147 *
148 * Since not all options can be specified in both commands, this function
149 * will report an error if mutually exclusive options are specified.
150 */
151static void
154{
155 ListCell *lc;
156
157 /* Start out with cleared opts. */
158 memset(opts, 0, sizeof(SubOpts));
159
160 /* caller must expect some option */
162
163 /* If connect option is supported, these others also need to be. */
167
168 /* Set default values for the supported options. */
170 opts->connect = true;
172 opts->enabled = true;
174 opts->create_slot = true;
176 opts->copy_data = true;
178 opts->refresh = true;
180 opts->binary = false;
182 opts->streaming = LOGICALREP_STREAM_PARALLEL;
184 opts->twophase = false;
186 opts->disableonerr = false;
188 opts->passwordrequired = true;
190 opts->runasowner = false;
192 opts->failover = false;
194 opts->retaindeadtuples = false;
196 opts->maxretention = 0;
199
200 /* Parse options */
201 foreach(lc, stmt_options)
202 {
203 DefElem *defel = (DefElem *) lfirst(lc);
204
206 strcmp(defel->defname, "connect") == 0)
207 {
208 if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
210
211 opts->specified_opts |= SUBOPT_CONNECT;
212 opts->connect = defGetBoolean(defel);
213 }
215 strcmp(defel->defname, "enabled") == 0)
216 {
217 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
219
220 opts->specified_opts |= SUBOPT_ENABLED;
221 opts->enabled = defGetBoolean(defel);
222 }
224 strcmp(defel->defname, "create_slot") == 0)
225 {
226 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
228
229 opts->specified_opts |= SUBOPT_CREATE_SLOT;
230 opts->create_slot = defGetBoolean(defel);
231 }
233 strcmp(defel->defname, "slot_name") == 0)
234 {
235 if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
237
238 opts->specified_opts |= SUBOPT_SLOT_NAME;
239 opts->slot_name = defGetString(defel);
240
241 /* Setting slot_name = NONE is treated as no slot name. */
242 if (strcmp(opts->slot_name, "none") == 0)
243 opts->slot_name = NULL;
244 else
245 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
246 }
248 strcmp(defel->defname, "copy_data") == 0)
249 {
250 if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
252
253 opts->specified_opts |= SUBOPT_COPY_DATA;
254 opts->copy_data = defGetBoolean(defel);
255 }
257 strcmp(defel->defname, "synchronous_commit") == 0)
258 {
259 if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
261
262 opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
263 opts->synchronous_commit = defGetString(defel);
264
265 /* Test if the given value is valid for synchronous_commit GUC. */
266 (void) set_config_option("synchronous_commit", opts->synchronous_commit,
268 false, 0, false);
269 }
271 strcmp(defel->defname, "refresh") == 0)
272 {
273 if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
275
276 opts->specified_opts |= SUBOPT_REFRESH;
277 opts->refresh = defGetBoolean(defel);
278 }
280 strcmp(defel->defname, "binary") == 0)
281 {
282 if (IsSet(opts->specified_opts, SUBOPT_BINARY))
284
285 opts->specified_opts |= SUBOPT_BINARY;
286 opts->binary = defGetBoolean(defel);
287 }
289 strcmp(defel->defname, "streaming") == 0)
290 {
291 if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
293
294 opts->specified_opts |= SUBOPT_STREAMING;
295 opts->streaming = defGetStreamingMode(defel);
296 }
298 strcmp(defel->defname, "two_phase") == 0)
299 {
300 if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
302
303 opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
304 opts->twophase = defGetBoolean(defel);
305 }
307 strcmp(defel->defname, "disable_on_error") == 0)
308 {
309 if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
311
312 opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
313 opts->disableonerr = defGetBoolean(defel);
314 }
316 strcmp(defel->defname, "password_required") == 0)
317 {
318 if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
320
321 opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
322 opts->passwordrequired = defGetBoolean(defel);
323 }
325 strcmp(defel->defname, "run_as_owner") == 0)
326 {
327 if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
329
330 opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
331 opts->runasowner = defGetBoolean(defel);
332 }
334 strcmp(defel->defname, "failover") == 0)
335 {
336 if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
338
339 opts->specified_opts |= SUBOPT_FAILOVER;
340 opts->failover = defGetBoolean(defel);
341 }
343 strcmp(defel->defname, "retain_dead_tuples") == 0)
344 {
345 if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
347
348 opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
349 opts->retaindeadtuples = defGetBoolean(defel);
350 }
352 strcmp(defel->defname, "max_retention_duration") == 0)
353 {
354 if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
356
357 opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
358 opts->maxretention = defGetInt32(defel);
359
360 if (opts->maxretention < 0)
363 errmsg("max_retention_duration cannot be negative"));
364 }
366 strcmp(defel->defname, "origin") == 0)
367 {
368 if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
370
371 opts->specified_opts |= SUBOPT_ORIGIN;
372 pfree(opts->origin);
373
374 /*
375 * Even though the "origin" parameter allows only "none" and "any"
376 * values, it is implemented as a string type so that the
377 * parameter can be extended in future versions to support
378 * filtering using origin names specified by the user.
379 */
380 opts->origin = defGetString(defel);
381
382 if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
386 errmsg("unrecognized origin value: \"%s\"", opts->origin));
387 }
388 else if (IsSet(supported_opts, SUBOPT_LSN) &&
389 strcmp(defel->defname, "lsn") == 0)
390 {
391 char *lsn_str = defGetString(defel);
392 XLogRecPtr lsn;
393
394 if (IsSet(opts->specified_opts, SUBOPT_LSN))
396
397 /* Setting lsn = NONE is treated as resetting LSN */
398 if (strcmp(lsn_str, "none") == 0)
399 lsn = InvalidXLogRecPtr;
400 else
401 {
402 /* Parse the argument as LSN */
405
406 if (!XLogRecPtrIsValid(lsn))
409 errmsg("invalid WAL location (LSN): %s", lsn_str)));
410 }
411
412 opts->specified_opts |= SUBOPT_LSN;
413 opts->lsn = lsn;
414 }
416 strcmp(defel->defname, "wal_receiver_timeout") == 0)
417 {
418 bool parsed;
419 int val;
420
421 if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
423
424 opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
425 opts->wal_receiver_timeout = defGetString(defel);
426
427 /*
428 * Test if the given value is valid for wal_receiver_timeout GUC.
429 * Skip this test if the value is -1, since -1 is allowed for the
430 * wal_receiver_timeout subscription option, but not for the GUC
431 * itself.
432 */
433 parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
434 if (!parsed || val != -1)
435 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
437 false, 0, false);
438 }
439 else
442 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
443 }
444
445 /*
446 * We've been explicitly asked to not connect, that requires some
447 * additional processing.
448 */
449 if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
450 {
451 /* Check for incompatible options from the user. */
452 if (opts->enabled &&
453 IsSet(opts->specified_opts, SUBOPT_ENABLED))
456 /*- translator: both %s are strings of the form "option = value" */
457 errmsg("%s and %s are mutually exclusive options",
458 "connect = false", "enabled = true")));
459
460 if (opts->create_slot &&
461 IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
464 errmsg("%s and %s are mutually exclusive options",
465 "connect = false", "create_slot = true")));
466
467 if (opts->copy_data &&
468 IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
471 errmsg("%s and %s are mutually exclusive options",
472 "connect = false", "copy_data = true")));
473
474 /* Change the defaults of other options. */
475 opts->enabled = false;
476 opts->create_slot = false;
477 opts->copy_data = false;
478 }
479
480 /*
481 * Do additional checking for disallowed combination when slot_name = NONE
482 * was used.
483 */
484 if (!opts->slot_name &&
485 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
486 {
487 if (opts->enabled)
488 {
489 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
492 /*- translator: both %s are strings of the form "option = value" */
493 errmsg("%s and %s are mutually exclusive options",
494 "slot_name = NONE", "enabled = true")));
495 else
498 /*- translator: both %s are strings of the form "option = value" */
499 errmsg("subscription with %s must also set %s",
500 "slot_name = NONE", "enabled = false")));
501 }
502
503 if (opts->create_slot)
504 {
505 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
508 /*- translator: both %s are strings of the form "option = value" */
509 errmsg("%s and %s are mutually exclusive options",
510 "slot_name = NONE", "create_slot = true")));
511 else
514 /*- translator: both %s are strings of the form "option = value" */
515 errmsg("subscription with %s must also set %s",
516 "slot_name = NONE", "create_slot = false")));
517 }
518 }
519}
520
521/*
522 * Append a suitably-quoted identifier or string literal to buf.
523 * "quote" should be either a double-quote or single-quote character.
524 *
525 * Caution: this quoting logic is sufficient for identifiers and literals
526 * in the replication grammar, but not always in regular SQL. Specifically,
527 * it'd fail for a string literal if standard_conforming_strings is off.
528 */
529static void
530appendQuotedString(StringInfo buf, const char *str, char quote)
531{
533 while (*str)
534 {
535 char c = *str++;
536
537 if (c == quote)
540 }
542}
543
544#define appendQuotedIdentifier(b, s) appendQuotedString(b, s, '"')
545#define appendQuotedLiteral(b, s) appendQuotedString(b, s, '\'')
546
547/*
548 * Check that the specified publications are present on the publisher.
549 */
550static void
551check_publications(WalReceiverConn *wrconn, List *publications)
552{
553 WalRcvExecResult *res;
554 StringInfoData cmd;
555 TupleTableSlot *slot;
556 List *publicationsCopy = NIL;
557 Oid tableRow[1] = {TEXTOID};
558
559 initStringInfo(&cmd);
560 appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
561 " pg_catalog.pg_publication t WHERE\n"
562 " t.pubname IN (");
563 GetPublicationsStr(publications, &cmd, true);
564 appendStringInfoChar(&cmd, ')');
565
566 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
567 pfree(cmd.data);
568
569 if (res->status != WALRCV_OK_TUPLES)
570 ereport(ERROR,
571 errmsg("could not receive list of publications from the publisher: %s",
572 res->err));
573
574 publicationsCopy = list_copy(publications);
575
576 /* Process publication(s). */
577 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
578 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
579 {
580 char *pubname;
581 bool isnull;
582
583 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
584 Assert(!isnull);
585
586 /* Delete the publication present in publisher from the list. */
587 publicationsCopy = list_delete(publicationsCopy, makeString(pubname));
588 ExecClearTuple(slot);
589 }
590
591 ExecDropSingleTupleTableSlot(slot);
592
593 walrcv_clear_result(res);
594
595 if (list_length(publicationsCopy))
596 {
597 /* Prepare the list of non-existent publication(s) for error message. */
598 StringInfoData pubnames;
599
600 initStringInfo(&pubnames);
601
602 GetPublicationsStr(publicationsCopy, &pubnames, false);
603 ereport(WARNING,
604 errcode(ERRCODE_UNDEFINED_OBJECT),
605 errmsg_plural("publication %s does not exist on the publisher",
606 "publications %s do not exist on the publisher",
607 list_length(publicationsCopy),
608 pubnames.data));
609 }
610}
611
612/*
613 * Auxiliary function to build a text array out of a list of String nodes.
614 */
615static Datum
616publicationListToArray(List *publist)
617{
618 ArrayType *arr;
619 Datum *datums;
620 MemoryContext memcxt;
621 MemoryContext oldcxt;
622
623 /* Create memory context for temporary allocations. */
624 memcxt = AllocSetContextCreate(CurrentMemoryContext,
625 "publicationListToArray to array",
626 ALLOCSET_DEFAULT_SIZES);
627 oldcxt = MemoryContextSwitchTo(memcxt);
628
629 datums = palloc_array(Datum, list_length(publist));
630
631 check_duplicates_in_publist(publist, datums);
632
633 MemoryContextSwitchTo(oldcxt);
634
635 arr = construct_array_builtin(datums, list_length(publist), TEXTOID);
636
637 MemoryContextDelete(memcxt);
638
639 return PointerGetDatum(arr);
640}
641
642/*
643 * Create new subscription.
644 */
645ObjectAddress
646CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
647 bool isTopLevel)
648{
649 Relation rel;
650 ObjectAddress myself;
651 Oid subid;
652 bool nulls[Natts_pg_subscription];
653 Datum values[Natts_pg_subscription];
654 Oid owner = GetUserId();
655 HeapTuple tup;
656 Oid serverid;
657 char *conninfo;
658 char originname[NAMEDATALEN];
659 List *publications;
660 uint32 supported_opts;
661 SubOpts opts = {0};
662 AclResult aclresult;
663
664 /*
665 * Parse and check options.
666 *
667 * Connection and publication should not be specified here.
668 */
669 supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
670 SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
671 SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
672 SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
673 SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
674 SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
675 SUBOPT_RETAIN_DEAD_TUPLES |
676 SUBOPT_MAX_RETENTION_DURATION |
677 SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
678 parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
679
680 /*
681 * Since creating a replication slot is not transactional, rolling back
682 * the transaction leaves the created replication slot. So we cannot run
683 * CREATE SUBSCRIPTION inside a transaction block if creating a
684 * replication slot.
685 */
686 if (opts.create_slot)
687 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
688
689 /*
690 * We don't want to allow unprivileged users to be able to trigger
691 * attempts to access arbitrary network destinations, so require the user
692 * to have been specifically authorized to create subscriptions.
693 */
694 if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
695 ereport(ERROR,
696 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
697 errmsg("permission denied to create subscription"),
698 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
699 "pg_create_subscription")));
700
701 /*
702 * Since a subscription is a database object, we also check for CREATE
703 * permission on the database.
704 */
706 owner, ACL_CREATE);
707 if (aclresult != ACLCHECK_OK)
710
711 /*
712 * Non-superusers are required to set a password for authentication, and
713 * that password must be used by the target server, but the superuser can
714 * exempt a subscription from this requirement.
715 */
716 if (!opts.passwordrequired && !superuser_arg(owner))
719 errmsg("password_required=false is superuser-only"),
720 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
721
722 /*
723 * If built with appropriate switch, whine when regression-testing
724 * conventions for subscription names are violated.
725 */
726#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
727 if (strncmp(stmt->subname, "regress_", 8) != 0)
728 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
729#endif
730
732
733 /* Check if name is used */
736 if (OidIsValid(subid))
737 {
740 errmsg("subscription \"%s\" already exists",
741 stmt->subname)));
742 }
743
744 /*
745 * Ensure that system configuration parameters are set appropriately to
746 * support retain_dead_tuples and max_retention_duration.
747 */
749 opts.retaindeadtuples, opts.retaindeadtuples,
750 (opts.maxretention > 0));
751
752 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
753 opts.slot_name == NULL)
754 opts.slot_name = stmt->subname;
755
756 /* The default for synchronous_commit of subscriptions is off. */
757 if (opts.synchronous_commit == NULL)
758 opts.synchronous_commit = "off";
759
760 /*
761 * The default for wal_receiver_timeout of subscriptions is -1, which
762 * means the value is inherited from the server configuration, command
763 * line, or role/database settings.
764 */
765 if (opts.wal_receiver_timeout == NULL)
766 opts.wal_receiver_timeout = "-1";
767
768 /* Load the library providing us libpq calls. */
769 load_file("libpqwalreceiver", false);
770
771 if (stmt->servername)
772 {
773 ForeignServer *server;
774
775 Assert(!stmt->conninfo);
776 conninfo = NULL;
777
778 server = GetForeignServerByName(stmt->servername, false);
780 if (aclresult != ACLCHECK_OK)
782
783 /* make sure a user mapping exists */
784 GetUserMapping(owner, server->serverid);
785
786 serverid = server->serverid;
787 conninfo = ForeignServerConnectionString(owner, server);
788 }
789 else
790 {
791 Assert(stmt->conninfo);
792
793 serverid = InvalidOid;
794 conninfo = stmt->conninfo;
795 }
796
797 /* Check the connection info string. */
798 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
799
800 publications = stmt->publication;
801
802 /* Everything ok, form a new tuple. */
803 memset(values, 0, sizeof(values));
804 memset(nulls, false, sizeof(nulls));
805
818 CharGetDatum(opts.twophase ?
826 BoolGetDatum(opts.retaindeadtuples);
828 Int32GetDatum(opts.maxretention);
830 BoolGetDatum(opts.retaindeadtuples);
832 if (!OidIsValid(serverid))
834 CStringGetTextDatum(conninfo);
835 else
836 nulls[Anum_pg_subscription_subconninfo - 1] = true;
837 if (opts.slot_name)
840 else
841 nulls[Anum_pg_subscription_subslotname - 1] = true;
843 CStringGetTextDatum(opts.synchronous_commit);
845 CStringGetTextDatum(opts.wal_receiver_timeout);
847 publicationListToArray(publications);
850
852
853 /* Insert tuple into catalog. */
856
858
860
861 if (stmt->servername)
862 {
864
865 Assert(OidIsValid(serverid));
866
869 }
870
871 /*
872 * A replication origin is currently created for all subscriptions,
873 * including those that only contain sequences or are otherwise empty.
874 *
875 * XXX: While this is technically unnecessary, optimizing it would require
876 * additional logic to skip origin creation during DDL operations and
877 * apply workers initialization, and to handle origin creation dynamically
878 * when tables are added to the subscription. It is not clear whether
879 * preventing creation of origins is worth additional complexity.
880 */
883
884 /*
885 * Connect to remote side to execute requested commands and fetch table
886 * and sequence info.
887 */
888 if (opts.connect)
889 {
890 char *err;
893
894 /* Try to connect to the publisher. */
895 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
896 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
897 stmt->subname, &err);
898 if (!wrconn)
901 errmsg("subscription \"%s\" could not connect to the publisher: %s",
902 stmt->subname, err)));
903
904 PG_TRY();
905 {
906 bool has_tables = false;
907 List *pubrels;
908 char relation_state;
909
910 check_publications(wrconn, publications);
912 opts.copy_data,
913 opts.retaindeadtuples, opts.origin,
914 NULL, 0, stmt->subname);
916 opts.copy_data, opts.origin,
917 NULL, 0, stmt->subname);
918
919 if (opts.retaindeadtuples)
921
922 /*
923 * Set sync state based on if we were asked to do data copy or
924 * not.
925 */
927
928 /*
929 * Build local relation status info. Relations are for both tables
930 * and sequences from the publisher.
931 */
932 pubrels = fetch_relation_list(wrconn, publications);
933
935 {
936 Oid relid;
937 char relkind;
938 RangeVar *rv = pubrelinfo->rv;
939
940 relid = RangeVarGetRelid(rv, AccessShareLock, false);
941 relkind = get_rel_relkind(relid);
942
943 /* Check for supported relkind. */
944 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
945 rv->schemaname, rv->relname);
946 has_tables |= (relkind != RELKIND_SEQUENCE);
948 InvalidXLogRecPtr, true);
949 }
950
951 /*
952 * If requested, create permanent slot for the subscription. We
953 * won't use the initial snapshot for anything, so no need to
954 * export it.
955 *
956 * XXX: Similar to origins, it is not clear whether preventing the
957 * slot creation for empty and sequence-only subscriptions is
958 * worth additional complexity.
959 */
960 if (opts.create_slot)
961 {
962 bool twophase_enabled = false;
963
964 Assert(opts.slot_name);
965
966 /*
967 * Even if two_phase is set, don't create the slot with
968 * two-phase enabled. Will enable it once all the tables are
969 * synced and ready. This avoids race-conditions like prepared
970 * transactions being skipped due to changes not being applied
971 * due to checks in should_apply_changes_for_rel() when
972 * tablesync for the corresponding tables are in progress. See
973 * comments atop worker.c.
974 *
975 * Note that if tables were specified but copy_data is false
976 * then it is safe to enable two_phase up-front because those
977 * tables are already initially in READY state. When the
978 * subscription has no tables, we leave the twophase state as
979 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
980 * PUBLICATION to work.
981 */
982 if (opts.twophase && !opts.copy_data && has_tables)
983 twophase_enabled = true;
984
986 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
987
990
992 (errmsg("created replication slot \"%s\" on publisher",
993 opts.slot_name)));
994 }
995 }
996 PG_FINALLY();
997 {
999 }
1000 PG_END_TRY();
1001 }
1002 else
1004 (errmsg("subscription was created, but is not connected"),
1005 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
1006
1008
1010
1011 /*
1012 * Notify the launcher to start the apply worker if the subscription is
1013 * enabled, or to create the conflict detection slot if retain_dead_tuples
1014 * is enabled.
1015 *
1016 * Creating the conflict detection slot is essential even when the
1017 * subscription is not enabled. This ensures that dead tuples are
1018 * retained, which is necessary for accurately identifying the type of
1019 * conflict during replication.
1020 */
1021 if (opts.enabled || opts.retaindeadtuples)
1023
1025
1026 return myself;
1027}
1028
1029static void
1032{
1033 char *err;
1034 List *pubrels = NIL;
1040 int subrel_count;
1041 ListCell *lc;
1042 int off;
1043 int tbl_count = 0;
1044 int seq_count = 0;
1045 Relation rel = NULL;
1046 typedef struct SubRemoveRels
1047 {
1048 Oid relid;
1049 char state;
1050 } SubRemoveRels;
1051
1053 bool must_use_password;
1054
1055 /* Load the library providing us libpq calls. */
1056 load_file("libpqwalreceiver", false);
1057
1058 /* Try to connect to the publisher. */
1060 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1061 sub->name, &err);
1062 if (!wrconn)
1063 ereport(ERROR,
1065 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1066 sub->name, err)));
1067
1068 PG_TRY();
1069 {
1072
1073 /* Get the relation list from publisher. */
1075
1076 /* Get local relation list. */
1077 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
1079
1080 /*
1081 * Build qsorted arrays of local table oids and sequence oids for
1082 * faster lookup. This can potentially contain all tables and
1083 * sequences in the database so speed of lookup is important.
1084 *
1085 * We do not yet know the exact count of tables and sequences, so we
1086 * allocate separate arrays for table OIDs and sequence OIDs based on
1087 * the total number of relations (subrel_count).
1088 */
1091 foreach(lc, subrel_states)
1092 {
1094
1095 if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
1096 subseq_local_oids[seq_count++] = relstate->relid;
1097 else
1098 subrel_local_oids[tbl_count++] = relstate->relid;
1099 }
1100
1103 sub->retaindeadtuples, sub->origin,
1105 sub->name);
1106
1109 copy_data, sub->origin,
1111 sub->name);
1112
1113 /*
1114 * Walk over the remote relations and try to match them to locally
1115 * known relations. If the relation is not known locally create a new
1116 * state for it.
1117 *
1118 * Also builds array of local oids of remote relations for the next
1119 * step.
1120 */
1121 off = 0;
1123
1125 {
1126 RangeVar *rv = pubrelinfo->rv;
1127 Oid relid;
1128 char relkind;
1129
1130 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1131 relkind = get_rel_relkind(relid);
1132
1133 /* Check for supported relkind. */
1134 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1135 rv->schemaname, rv->relname);
1136
1137 pubrel_local_oids[off++] = relid;
1138
1139 if (!bsearch(&relid, subrel_local_oids,
1140 tbl_count, sizeof(Oid), oid_cmp) &&
1141 !bsearch(&relid, subseq_local_oids,
1142 seq_count, sizeof(Oid), oid_cmp))
1143 {
1144 AddSubscriptionRelState(sub->oid, relid,
1146 InvalidXLogRecPtr, true);
1148 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1149 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1150 rv->schemaname, rv->relname, sub->name));
1151 }
1152 }
1153
1154 /*
1155 * Next remove state for tables we should not care about anymore using
1156 * the data we collected above
1157 */
1159
1160 for (off = 0; off < tbl_count; off++)
1161 {
1162 Oid relid = subrel_local_oids[off];
1163
1164 if (!bsearch(&relid, pubrel_local_oids,
1165 list_length(pubrels), sizeof(Oid), oid_cmp))
1166 {
1167 char state;
1168 XLogRecPtr statelsn;
1170
1171 /*
1172 * Lock pg_subscription_rel with AccessExclusiveLock to
1173 * prevent any race conditions with the apply worker
1174 * re-launching workers at the same time this code is trying
1175 * to remove those tables.
1176 *
1177 * Even if new worker for this particular rel is restarted it
1178 * won't be able to make any progress as we hold exclusive
1179 * lock on pg_subscription_rel till the transaction end. It
1180 * will simply exit as there is no corresponding rel entry.
1181 *
1182 * This locking also ensures that the state of rels won't
1183 * change till we are done with this refresh operation.
1184 */
1185 if (!rel)
1187
1188 /* Last known rel state. */
1189 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1190
1191 RemoveSubscriptionRel(sub->oid, relid);
1192
1193 remove_rel->relid = relid;
1194 remove_rel->state = state;
1195
1197
1199
1200 /*
1201 * For READY state, we would have already dropped the
1202 * tablesync origin.
1203 */
1205 {
1206 char originname[NAMEDATALEN];
1207
1208 /*
1209 * Drop the tablesync's origin tracking if exists.
1210 *
1211 * It is possible that the origin is not yet created for
1212 * tablesync worker, this can happen for the states before
1213 * SUBREL_STATE_DATASYNC. The tablesync worker or apply
1214 * worker can also concurrently try to drop the origin and
1215 * by this time the origin might be already removed. For
1216 * these reasons, passing missing_ok = true.
1217 */
1219 sizeof(originname));
1220 replorigin_drop_by_name(originname, true, false);
1221 }
1222
1224 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1226 get_rel_name(relid),
1227 sub->name)));
1228 }
1229 }
1230
1231 /*
1232 * Drop the tablesync slots associated with removed tables. This has
1233 * to be at the end because otherwise if there is an error while doing
1234 * the database operations we won't be able to rollback dropped slots.
1235 */
1237 {
1238 if (sub_remove_rel->state != SUBREL_STATE_READY &&
1240 {
1241 char syncslotname[NAMEDATALEN] = {0};
1242
1243 /*
1244 * For READY/SYNCDONE states we know the tablesync slot has
1245 * already been dropped by the tablesync worker.
1246 *
1247 * For other states, there is no certainty, maybe the slot
1248 * does not exist yet. Also, if we fail after removing some of
1249 * the slots, next time, it will again try to drop already
1250 * dropped slots and fail. For these reasons, we allow
1251 * missing_ok = true for the drop.
1252 */
1254 syncslotname, sizeof(syncslotname));
1256 }
1257 }
1258
1259 /*
1260 * Next remove state for sequences we should not care about anymore
1261 * using the data we collected above
1262 */
1263 for (off = 0; off < seq_count; off++)
1264 {
1265 Oid relid = subseq_local_oids[off];
1266
1267 if (!bsearch(&relid, pubrel_local_oids,
1268 list_length(pubrels), sizeof(Oid), oid_cmp))
1269 {
1270 /*
1271 * This locking ensures that the state of rels won't change
1272 * till we are done with this refresh operation.
1273 */
1274 if (!rel)
1276
1277 RemoveSubscriptionRel(sub->oid, relid);
1278
1280 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1282 get_rel_name(relid),
1283 sub->name));
1284 }
1285 }
1286 }
1287 PG_FINALLY();
1288 {
1290 }
1291 PG_END_TRY();
1292
1293 if (rel)
1294 table_close(rel, NoLock);
1295}
1296
1297/*
1298 * Marks all sequences with INIT state.
1299 */
1300static void
1302{
1303 char *err = NULL;
1305 bool must_use_password;
1306
1307 /* Load the library providing us libpq calls. */
1308 load_file("libpqwalreceiver", false);
1309
1310 /* Try to connect to the publisher. */
1312 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1313 sub->name, &err);
1314 if (!wrconn)
1315 ereport(ERROR,
1317 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1318 sub->name, err));
1319
1320 PG_TRY();
1321 {
1323
1325 sub->origin, NULL, 0, sub->name);
1326
1327 /* Get local sequence list. */
1328 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1330 {
1331 Oid relid = subrel->relid;
1332
1334 InvalidXLogRecPtr, false);
1336 errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1338 get_rel_name(relid),
1339 sub->name));
1340 }
1341 }
1342 PG_FINALLY();
1343 {
1345 }
1346 PG_END_TRY();
1347}
1348
1349/*
1350 * Common checks for altering failover, two_phase, and retain_dead_tuples
1351 * options.
1352 */
1353static void
1355 bool slot_needs_update, bool isTopLevel)
1356{
1357 Assert(strcmp(option, "failover") == 0 ||
1358 strcmp(option, "two_phase") == 0 ||
1359 strcmp(option, "retain_dead_tuples") == 0);
1360
1361 /*
1362 * Altering the retain_dead_tuples option does not update the slot on the
1363 * publisher.
1364 */
1365 Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1366
1367 /*
1368 * Do not allow changing the option if the subscription is enabled. This
1369 * is because both failover and two_phase options of the slot on the
1370 * publisher cannot be modified if the slot is currently acquired by the
1371 * existing walsender.
1372 *
1373 * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1374 * the publisher by the existing walsender, so we could have allowed that
1375 * even when the subscription is enabled. But we kept this restriction for
1376 * the sake of consistency and simplicity.
1377 *
1378 * Additionally, do not allow changing the retain_dead_tuples option when
1379 * the subscription is enabled to prevent race conditions arising from the
1380 * new option value being acknowledged asynchronously by the launcher and
1381 * apply workers.
1382 *
1383 * Without the restriction, a race condition may arise when a user
1384 * disables and immediately re-enables the retain_dead_tuples option. In
1385 * this case, the launcher might drop the slot upon noticing the disabled
1386 * action, while the apply worker may keep maintaining
1387 * oldest_nonremovable_xid without noticing the option change. During this
1388 * period, a transaction ID wraparound could falsely make this ID appear
1389 * as if it originates from the future w.r.t the transaction ID stored in
1390 * the slot maintained by launcher.
1391 *
1392 * Similarly, if the user enables retain_dead_tuples concurrently with the
1393 * launcher starting the worker, the apply worker may start calculating
1394 * oldest_nonremovable_xid before the launcher notices the enable action.
1395 * Consequently, the launcher may update slot.xmin to a newer value than
1396 * that maintained by the worker. In subsequent cycles, upon integrating
1397 * the worker's oldest_nonremovable_xid, the launcher might detect a
1398 * retreat in the calculated xmin, necessitating additional handling.
1399 *
1400 * XXX To address the above race conditions, we can define
1401 * oldest_nonremovable_xid as FullTransactionId and adds the check to
1402 * disallow retreating the conflict slot's xmin. For now, we kept the
1403 * implementation simple by disallowing change to the retain_dead_tuples,
1404 * but in the future we can change this after some more analysis.
1405 *
1406 * Note that we could restrict only the enabling of retain_dead_tuples to
1407 * avoid the race conditions described above, but we maintain the
1408 * restriction for both enable and disable operations for the sake of
1409 * consistency.
1410 */
1411 if (sub->enabled)
1412 ereport(ERROR,
1414 errmsg("cannot set option \"%s\" for enabled subscription",
1415 option)));
1416
1418 {
1419 StringInfoData cmd;
1420
1421 /*
1422 * A valid slot must be associated with the subscription for us to
1423 * modify any of the slot's properties.
1424 */
1425 if (!sub->slotname)
1426 ereport(ERROR,
1428 errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1429 option)));
1430
1431 /* The changed option of the slot can't be rolled back. */
1432 initStringInfo(&cmd);
1433 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1434
1436 pfree(cmd.data);
1437 }
1438}
1439
1440/*
1441 * Alter the existing subscription.
1442 */
1445 bool isTopLevel)
1446{
1447 Relation rel;
1449 bool nulls[Natts_pg_subscription];
1452 HeapTuple tup;
1453 Oid subid;
1454 bool orig_conninfo_needed = true;
1455 bool update_tuple = false;
1456 bool update_failover = false;
1457 bool update_two_phase = false;
1458 bool check_pub_rdt = false;
1459 bool retain_dead_tuples;
1460 int max_retention;
1461 bool retention_active;
1462 char *new_conninfo = NULL;
1463 char *origin;
1464 Subscription *sub;
1467 SubOpts opts = {0};
1468
1470
1471 /* Fetch the existing tuple. */
1473 CStringGetDatum(stmt->subname));
1474
1475 if (!HeapTupleIsValid(tup))
1476 ereport(ERROR,
1478 errmsg("subscription \"%s\" does not exist",
1479 stmt->subname)));
1480
1482 subid = form->oid;
1483
1484 /* must be owner */
1487 stmt->subname);
1488
1489 /* parse and check options */
1490 switch (stmt->kind)
1491 {
1503 break;
1504
1507 break;
1508
1511 break;
1512
1516 break;
1517
1520 break;
1521
1524 break;
1525
1526 default:
1527 supported_opts = 0;
1528 break;
1529 }
1530
1531 if (supported_opts > 0)
1533
1534 /*
1535 * Ensure that ALTER SUBSCRIPTION commands that could be used to fix a
1536 * broken connection or prepare to drop a broken subscription don't
1537 * attempt to construct the conninfo. Otherwise, we might encounter the
1538 * error the user is trying to fix.
1539 *
1540 * Specifically, ALTER SUBSCRIPTION DISABLE, ALTER SUBSCRIPTION SERVER,
1541 * ALTER SUBSCRIPTION CONNECTION, or ALTER SUBSCRIPTION SET
1542 * (slot_name=NONE).
1543 *
1544 * NB: if the user specifies multiple SET options, then we may still need
1545 * to construct conninfo even if slot_name is set to NONE.
1546 */
1547 if (stmt->kind == ALTER_SUBSCRIPTION_ENABLED)
1548 {
1549 if (opts.specified_opts == SUBOPT_ENABLED && !opts.enabled)
1550 orig_conninfo_needed = false;
1551 }
1552 else if (stmt->kind == ALTER_SUBSCRIPTION_SERVER ||
1554 {
1555 orig_conninfo_needed = false;
1556 }
1557 else if (stmt->kind == ALTER_SUBSCRIPTION_OPTIONS)
1558 {
1559 /* ... SET (slot_name = NONE) with no other options */
1560 if (opts.specified_opts == SUBOPT_SLOT_NAME && !opts.slot_name)
1561 orig_conninfo_needed = false;
1562 }
1563
1564 /*
1565 * Skip ACL checks on the subscription's foreign server, if any. If
1566 * changing the server (or replacing it with a raw connection), then the
1567 * old one will be removed anyway. If changing something unrelated,
1568 * there's no need to do an additional ACL check here; that will be done
1569 * by the subscription worker.
1570 */
1571 sub = GetSubscription(subid, false, orig_conninfo_needed, false);
1572
1574 origin = sub->origin;
1577
1578 /*
1579 * Don't allow non-superuser modification of a subscription with
1580 * password_required=false.
1581 */
1582 if (!sub->passwordrequired && !superuser())
1583 ereport(ERROR,
1585 errmsg("password_required=false is superuser-only"),
1586 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1587
1588 /* Lock the subscription so nobody else can do anything with it. */
1590
1591 /* Form a new tuple. */
1592 memset(values, 0, sizeof(values));
1593 memset(nulls, false, sizeof(nulls));
1594 memset(replaces, false, sizeof(replaces));
1595
1597
1598 switch (stmt->kind)
1599 {
1601 {
1602 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1603 {
1604 /*
1605 * The subscription must be disabled to allow slot_name as
1606 * 'none', otherwise, the apply worker will repeatedly try
1607 * to stream the data using that slot_name which neither
1608 * exists on the publisher nor the user will be allowed to
1609 * create it.
1610 */
1611 if (sub->enabled && !opts.slot_name)
1612 ereport(ERROR,
1614 errmsg("cannot set %s for enabled subscription",
1615 "slot_name = NONE")));
1616
1617 if (opts.slot_name)
1620 else
1621 nulls[Anum_pg_subscription_subslotname - 1] = true;
1623 }
1624
1625 if (opts.synchronous_commit)
1626 {
1628 CStringGetTextDatum(opts.synchronous_commit);
1630 }
1631
1632 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1633 {
1635 BoolGetDatum(opts.binary);
1637 }
1638
1639 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1640 {
1642 CharGetDatum(opts.streaming);
1644 }
1645
1646 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1647 {
1649 = BoolGetDatum(opts.disableonerr);
1651 = true;
1652 }
1653
1654 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1655 {
1656 /* Non-superuser may not disable password_required. */
1657 if (!opts.passwordrequired && !superuser())
1658 ereport(ERROR,
1660 errmsg("password_required=false is superuser-only"),
1661 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1662
1664 = BoolGetDatum(opts.passwordrequired);
1666 = true;
1667 }
1668
1669 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1670 {
1672 BoolGetDatum(opts.runasowner);
1674 }
1675
1676 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1677 {
1678 /*
1679 * We need to update both the slot and the subscription
1680 * for the two_phase option. We can enable the two_phase
1681 * option for a slot only once the initial data
1682 * synchronization is done. This is to avoid missing some
1683 * data as explained in comments atop worker.c.
1684 */
1685 update_two_phase = !opts.twophase;
1686
1687 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1688 isTopLevel);
1689
1690 /*
1691 * Modifying the two_phase slot option requires a slot
1692 * lookup by slot name, so changing the slot name at the
1693 * same time is not allowed.
1694 */
1695 if (update_two_phase &&
1696 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1697 ereport(ERROR,
1699 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1700
1701 /*
1702 * Note that workers may still survive even if the
1703 * subscription has been disabled.
1704 *
1705 * Ensure workers have already been exited to avoid
1706 * getting prepared transactions while we are disabling
1707 * the two_phase option. Otherwise, the changes of an
1708 * already prepared transaction can be replicated again
1709 * along with its corresponding commit, leading to
1710 * duplicate data or errors.
1711 */
1712 if (logicalrep_workers_find(subid, true, true))
1713 ereport(ERROR,
1715 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1716 errhint("Try again after some time.")));
1717
1718 /*
1719 * two_phase cannot be disabled if there are any
1720 * uncommitted prepared transactions present otherwise it
1721 * can lead to duplicate data or errors as explained in
1722 * the comment above.
1723 */
1724 if (update_two_phase &&
1726 LookupGXactBySubid(subid))
1727 ereport(ERROR,
1729 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1730 errhint("Resolve these transactions and try again.")));
1731
1732 /* Change system catalog accordingly */
1734 CharGetDatum(opts.twophase ?
1738 }
1739
1740 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1741 {
1742 /*
1743 * Similar to the two_phase case above, we need to update
1744 * the failover option for both the slot and the
1745 * subscription.
1746 */
1747 update_failover = true;
1748
1749 CheckAlterSubOption(sub, "failover", update_failover,
1750 isTopLevel);
1751
1753 BoolGetDatum(opts.failover);
1755 }
1756
1757 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1758 {
1760 BoolGetDatum(opts.retaindeadtuples);
1762
1763 /*
1764 * Update the retention status only if there's a change in
1765 * the retain_dead_tuples option value.
1766 *
1767 * Automatically marking retention as active when
1768 * retain_dead_tuples is enabled may not always be ideal,
1769 * especially if retention was previously stopped and the
1770 * user toggles retain_dead_tuples without adjusting the
1771 * publisher workload. However, this behavior provides a
1772 * convenient way for users to manually refresh the
1773 * retention status. Since retention will be stopped again
1774 * unless the publisher workload is reduced, this approach
1775 * is acceptable for now.
1776 */
1777 if (opts.retaindeadtuples != sub->retaindeadtuples)
1778 {
1780 BoolGetDatum(opts.retaindeadtuples);
1782
1783 retention_active = opts.retaindeadtuples;
1784 }
1785
1786 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1787
1788 /*
1789 * Workers may continue running even after the
1790 * subscription has been disabled.
1791 *
1792 * To prevent race conditions (as described in
1793 * CheckAlterSubOption()), ensure that all worker
1794 * processes have already exited before proceeding.
1795 */
1796 if (logicalrep_workers_find(subid, true, true))
1797 ereport(ERROR,
1799 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1800 errhint("Try again after some time.")));
1801
1802 /*
1803 * Notify the launcher to manage the replication slot for
1804 * conflict detection. This ensures that replication slot
1805 * is efficiently handled (created, updated, or dropped)
1806 * in response to any configuration changes.
1807 */
1809
1810 check_pub_rdt = opts.retaindeadtuples;
1811 retain_dead_tuples = opts.retaindeadtuples;
1812 }
1813
1814 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1815 {
1817 Int32GetDatum(opts.maxretention);
1819
1820 max_retention = opts.maxretention;
1821 }
1822
1823 /*
1824 * Ensure that system configuration parameters are set
1825 * appropriately to support retain_dead_tuples and
1826 * max_retention_duration.
1827 */
1828 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1829 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1833 (max_retention > 0));
1834
1835 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1836 {
1838 CStringGetTextDatum(opts.origin);
1840
1841 /*
1842 * Check if changes from different origins may be received
1843 * from the publisher when the origin is changed to ANY
1844 * and retain_dead_tuples is enabled. Use |= so that we
1845 * don't clear the flag already set when
1846 * retain_dead_tuples was changed in the same command.
1847 */
1850
1851 origin = opts.origin;
1852 }
1853
1854 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1855 {
1857 CStringGetTextDatum(opts.wal_receiver_timeout);
1859 }
1860
1861 update_tuple = true;
1862 break;
1863 }
1864
1866 {
1867 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1868
1869 if (!sub->slotname && opts.enabled)
1870 ereport(ERROR,
1872 errmsg("cannot enable subscription that does not have a slot name")));
1873
1874 /*
1875 * Check track_commit_timestamp only when enabling the
1876 * subscription in case it was disabled after creation. See
1877 * comments atop CheckSubDeadTupleRetention() for details.
1878 */
1879 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1881 sub->retentionactive, false);
1882
1884 BoolGetDatum(opts.enabled);
1886
1887 if (opts.enabled)
1889
1890 update_tuple = true;
1891
1892 /*
1893 * The subscription might be initially created with
1894 * connect=false and retain_dead_tuples=true, meaning the
1895 * remote server's status may not be checked. Ensure this
1896 * check is conducted now.
1897 */
1898 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1899 break;
1900 }
1901
1903 {
1907
1908 /*
1909 * Remove what was there before, either another foreign server
1910 * or a connection string.
1911 */
1912 if (form->subserver)
1913 {
1916 ForeignServerRelationId, form->subserver);
1917 }
1918 else
1919 {
1920 nulls[Anum_pg_subscription_subconninfo - 1] = true;
1922 }
1923
1924 /*
1925 * Check that the subscription owner has USAGE privileges on
1926 * the server.
1927 */
1928 new_server = GetForeignServerByName(stmt->servername, false);
1930 new_server->serverid,
1931 form->subowner, ACL_USAGE);
1932 if (aclresult != ACLCHECK_OK)
1933 ereport(ERROR,
1935 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1936 GetUserNameFromId(form->subowner, false),
1937 new_server->servername));
1938
1939 /* make sure a user mapping exists */
1940 GetUserMapping(form->subowner, new_server->serverid);
1941
1943 new_server);
1944
1945 /* Load the library providing us libpq calls. */
1946 load_file("libpqwalreceiver", false);
1947 /* Check the connection info string. */
1949 sub->passwordrequired && !sub->ownersuperuser);
1950
1953
1956
1957 update_tuple = true;
1958 }
1959
1960 /*
1961 * Since the remote server configuration might have changed,
1962 * perform a check to ensure it permits enabling
1963 * retain_dead_tuples.
1964 */
1966 break;
1967
1969 /* remove reference to foreign server and dependencies, if present */
1970 if (form->subserver)
1971 {
1974 ForeignServerRelationId, form->subserver);
1975
1978 }
1979
1980 new_conninfo = stmt->conninfo;
1981
1982 /* Load the library providing us libpq calls. */
1983 load_file("libpqwalreceiver", false);
1984 /* Check the connection info string. */
1986 sub->passwordrequired && !sub->ownersuperuser);
1987
1989 CStringGetTextDatum(stmt->conninfo);
1991 update_tuple = true;
1992
1993 /*
1994 * Since the remote server configuration might have changed,
1995 * perform a check to ensure it permits enabling
1996 * retain_dead_tuples.
1997 */
1999 break;
2000
2002 {
2004 publicationListToArray(stmt->publication);
2006
2007 update_tuple = true;
2008
2009 /* Refresh if user asked us to. */
2010 if (opts.refresh)
2011 {
2012 if (!sub->enabled)
2013 ereport(ERROR,
2015 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2016 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
2017
2018 /*
2019 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
2020 * why this is not allowed.
2021 */
2022 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2023 ereport(ERROR,
2025 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2026 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2027
2028 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2029
2030 /* Make sure refresh sees the new list of publications. */
2031 sub->publications = stmt->publication;
2032
2033 AlterSubscription_refresh(sub, opts.copy_data,
2034 stmt->publication);
2035 }
2036
2037 break;
2038 }
2039
2042 {
2043 List *publist;
2045
2046 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
2050
2051 update_tuple = true;
2052
2053 /* Refresh if user asked us to. */
2054 if (opts.refresh)
2055 {
2056 /* We only need to validate user specified publications. */
2057 List *validate_publications = (isadd) ? stmt->publication : NULL;
2058
2059 if (!sub->enabled)
2060 ereport(ERROR,
2062 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
2063 /* translator: %s is an SQL ALTER command */
2064 errhint("Use %s instead.",
2065 isadd ?
2066 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
2067 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
2068
2069 /*
2070 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
2071 * why this is not allowed.
2072 */
2073 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2074 ereport(ERROR,
2076 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
2077 /* translator: %s is an SQL ALTER command */
2078 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
2079 isadd ?
2080 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
2081 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
2082
2083 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
2084
2085 /* Refresh the new list of publications. */
2086 sub->publications = publist;
2087
2088 AlterSubscription_refresh(sub, opts.copy_data,
2090 }
2091
2092 break;
2093 }
2094
2096 {
2097 if (!sub->enabled)
2098 ereport(ERROR,
2100 errmsg("%s is not allowed for disabled subscriptions",
2101 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2102
2103 /*
2104 * The subscription option "two_phase" requires that
2105 * replication has passed the initial table synchronization
2106 * phase before the two_phase becomes properly enabled.
2107 *
2108 * But, having reached this two-phase commit "enabled" state
2109 * we must not allow any subsequent table initialization to
2110 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2111 * disallowed when the user had requested two_phase = on mode.
2112 *
2113 * The exception to this restriction is when copy_data =
2114 * false, because when copy_data is false the tablesync will
2115 * start already in READY state and will exit directly without
2116 * doing anything.
2117 *
2118 * For more details see comments atop worker.c.
2119 */
2120 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2121 ereport(ERROR,
2123 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2124 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2125
2126 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2127
2128 AlterSubscription_refresh(sub, opts.copy_data, NULL);
2129
2130 break;
2131 }
2132
2134 {
2135 if (!sub->enabled)
2136 ereport(ERROR,
2138 errmsg("%s is not allowed for disabled subscriptions",
2139 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2140
2142
2143 break;
2144 }
2145
2147 {
2148 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
2149 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2150
2151 /*
2152 * If the user sets subskiplsn, we do a sanity check to make
2153 * sure that the specified LSN is a probable value.
2154 */
2155 if (XLogRecPtrIsValid(opts.lsn))
2156 {
2158 char originname[NAMEDATALEN];
2159 XLogRecPtr remote_lsn;
2160
2162 originname, sizeof(originname));
2164 remote_lsn = replorigin_get_progress(originid, false);
2165
2166 /* Check the given LSN is at least a future LSN */
2167 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
2168 ereport(ERROR,
2170 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2171 LSN_FORMAT_ARGS(opts.lsn),
2172 LSN_FORMAT_ARGS(remote_lsn))));
2173 }
2174
2177
2178 update_tuple = true;
2179 break;
2180 }
2181
2182 default:
2183 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2184 stmt->kind);
2185 }
2186
2187 /* Update the catalog if needed. */
2188 if (update_tuple)
2189 {
2191 replaces);
2192
2193 CatalogTupleUpdate(rel, &tup->t_self, tup);
2194
2196 }
2197
2198 /*
2199 * Try to acquire the connection necessary either for modifying the slot
2200 * or for checking if the remote server permits enabling
2201 * retain_dead_tuples.
2202 *
2203 * This has to be at the end because otherwise if there is an error while
2204 * doing the database operations we won't be able to rollback altered
2205 * slot.
2206 */
2208 {
2209 bool must_use_password;
2210 char *err;
2212
2214
2215 /* Load the library providing us libpq calls. */
2216 load_file("libpqwalreceiver", false);
2217
2218 /*
2219 * Try to connect to the publisher, using the new connection string if
2220 * available.
2221 */
2224 true, true, must_use_password, sub->name,
2225 &err);
2226 if (!wrconn)
2227 ereport(ERROR,
2229 errmsg("subscription \"%s\" could not connect to the publisher: %s",
2230 sub->name, err)));
2231
2232 PG_TRY();
2233 {
2236
2238 retain_dead_tuples, origin, NULL, 0,
2239 sub->name);
2240
2243 update_failover ? &opts.failover : NULL,
2244 update_two_phase ? &opts.twophase : NULL);
2245 }
2246 PG_FINALLY();
2247 {
2249 }
2250 PG_END_TRY();
2251 }
2252
2254
2256
2257 /* Wake up related replication workers to handle this change quickly. */
2259
2260 return myself;
2261}
2262
2263/*
2264 * Construct conninfo from a subscription's server. Like libpqrcv_connect(),
2265 * if an error occurs, set *err to the error message and return NULL.
2266 *
2267 * However, failures in ForeignServerConnectionString() may ereport(ERROR),
2268 * and (also like libpqrcv_connect) it's not worth adding the machinery to
2269 * pass all of those back to the caller just to cover this one case.
2270 */
2271static char *
2273{
2275 ForeignServer *server;
2276
2277 *err = NULL;
2278
2279 server = GetForeignServer(subserver);
2280
2283 if (aclresult != ACLCHECK_OK)
2284 {
2285 /*
2286 * Unable to generate connection string because permissions on the
2287 * foreign server have been removed. Follow the same logic as an
2288 * unusable subconninfo (which will result in an ERROR later unless
2289 * slot_name = NONE).
2290 */
2291 *err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2293 server->servername);
2294 return NULL;
2295 }
2296
2298}
2299
2300/*
2301 * Drop a subscription
2302 */
2303void
2305{
2306 Relation rel;
2308 HeapTuple tup;
2309 Oid subid;
2310 Oid subowner;
2311 Oid subserver;
2312 char *subconninfo = NULL;
2313 Datum datum;
2314 bool isnull;
2315 char *subname;
2316 char *conninfo = NULL;
2317 char *slotname;
2319 ListCell *lc;
2320 char originname[NAMEDATALEN];
2321 char *err = NULL;
2324 List *rstates;
2325 bool must_use_password;
2326
2327 /*
2328 * The launcher may concurrently start a new worker for this subscription.
2329 * During initialization, the worker checks for subscription validity and
2330 * exits if the subscription has already been dropped. See
2331 * InitializeLogRepWorker.
2332 */
2334
2336 CStringGetDatum(stmt->subname));
2337
2338 if (!HeapTupleIsValid(tup))
2339 {
2340 table_close(rel, NoLock);
2341
2342 if (!stmt->missing_ok)
2343 ereport(ERROR,
2345 errmsg("subscription \"%s\" does not exist",
2346 stmt->subname)));
2347 else
2349 (errmsg("subscription \"%s\" does not exist, skipping",
2350 stmt->subname)));
2351
2352 return;
2353 }
2354
2357 if (!isnull)
2358 subconninfo = TextDatumGetCString(datum);
2359
2361 subid = form->oid;
2362 subowner = form->subowner;
2363 subserver = form->subserver;
2364 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2365
2366 /* must be owner */
2369 stmt->subname);
2370
2371 /* DROP hook for the subscription being removed */
2373
2374 /*
2375 * Lock the subscription so nobody else can do anything with it (including
2376 * the replication workers).
2377 */
2379
2380 /* Get subname */
2383 subname = pstrdup(NameStr(*DatumGetName(datum)));
2384
2385 /* Get slotname */
2388 if (!isnull)
2389 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2390 else
2391 slotname = NULL;
2392
2393 /*
2394 * Since dropping a replication slot is not transactional, the replication
2395 * slot stays dropped even if the transaction rolls back. So we cannot
2396 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2397 * replication slot. Also, in this case, we report a message for dropping
2398 * the subscription to the cumulative stats system.
2399 *
2400 * XXX The command name should really be something like "DROP SUBSCRIPTION
2401 * of a subscription that is associated with a replication slot", but we
2402 * don't have the proper facilities for that.
2403 */
2404 if (slotname)
2405 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2406
2409
2410 /* Remove the tuple from catalog. */
2411 CatalogTupleDelete(rel, &tup->t_self);
2412
2414
2415 /*
2416 * Stop all the subscription workers immediately.
2417 *
2418 * This is necessary if we are dropping the replication slot, so that the
2419 * slot becomes accessible.
2420 *
2421 * It is also necessary if the subscription is disabled and was disabled
2422 * in the same transaction. Then the workers haven't seen the disabling
2423 * yet and will still be running, leading to hangs later when we want to
2424 * drop the replication origin. If the subscription was disabled before
2425 * this transaction, then there shouldn't be any workers left, so this
2426 * won't make a difference.
2427 *
2428 * New workers won't be started because we hold an exclusive lock on the
2429 * subscription till the end of the transaction.
2430 */
2431 subworkers = logicalrep_workers_find(subid, false, true);
2432 foreach(lc, subworkers)
2433 {
2435
2437 }
2439
2440 /*
2441 * Remove the no-longer-useful entry in the launcher's table of apply
2442 * worker start times.
2443 *
2444 * If this transaction rolls back, the launcher might restart a failed
2445 * apply worker before wal_retrieve_retry_interval milliseconds have
2446 * elapsed, but that's pretty harmless.
2447 */
2449
2450 /*
2451 * Cleanup of tablesync replication origins.
2452 *
2453 * Any READY-state relations would already have dealt with clean-ups.
2454 *
2455 * Note that the state can't change because we have already stopped both
2456 * the apply and tablesync workers and they can't restart because of
2457 * exclusive lock on the subscription.
2458 */
2459 rstates = GetSubscriptionRelations(subid, true, false, true);
2460 foreach(lc, rstates)
2461 {
2463 Oid relid = rstate->relid;
2464
2465 /* Only cleanup resources of tablesync workers */
2466 if (!OidIsValid(relid))
2467 continue;
2468
2469 /*
2470 * Drop the tablesync's origin tracking if exists.
2471 *
2472 * It is possible that the origin is not yet created for tablesync
2473 * worker so passing missing_ok = true. This can happen for the states
2474 * before SUBREL_STATE_DATASYNC.
2475 */
2477 sizeof(originname));
2478 replorigin_drop_by_name(originname, true, false);
2479 }
2480
2481 /* Clean up dependencies */
2484
2485 /* Remove any associated relation synchronization states. */
2487
2488 /* Remove the origin tracking if exists. */
2490 replorigin_drop_by_name(originname, true, false);
2491
2492 /*
2493 * Tell the cumulative stats system that the subscription is getting
2494 * dropped.
2495 */
2497
2498 /*
2499 * If there is no slot associated with the subscription, we can finish
2500 * here.
2501 */
2502 if (!slotname && rstates == NIL)
2503 {
2504 table_close(rel, NoLock);
2505 return;
2506 }
2507
2508 /*
2509 * Try to acquire the connection necessary for dropping slots.
2510 *
2511 * Note: If the slotname is NONE/NULL then we allow the command to finish
2512 * and users need to manually cleanup the apply and tablesync worker slots
2513 * later.
2514 *
2515 * This has to be at the end because otherwise if there is an error while
2516 * doing the database operations we won't be able to rollback dropped
2517 * slot.
2518 */
2519 load_file("libpqwalreceiver", false);
2520
2521 if (OidIsValid(subserver))
2523 else
2524 conninfo = subconninfo;
2525
2526 if (conninfo)
2527 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2528 subname, &err);
2529
2530 if (wrconn == NULL)
2531 {
2532 if (!slotname)
2533 {
2534 /* be tidy */
2536 table_close(rel, NoLock);
2537 return;
2538 }
2539 else
2540 {
2541 ReportSlotConnectionError(rstates, subid, slotname, err);
2542 }
2543 }
2544
2545 PG_TRY();
2546 {
2547 foreach(lc, rstates)
2548 {
2550 Oid relid = rstate->relid;
2551
2552 /* Only cleanup resources of tablesync workers */
2553 if (!OidIsValid(relid))
2554 continue;
2555
2556 /*
2557 * Drop the tablesync slots associated with removed tables.
2558 *
2559 * For SYNCDONE/READY states, the tablesync slot is known to have
2560 * already been dropped by the tablesync worker.
2561 *
2562 * For other states, there is no certainty, maybe the slot does
2563 * not exist yet. Also, if we fail after removing some of the
2564 * slots, next time, it will again try to drop already dropped
2565 * slots and fail. For these reasons, we allow missing_ok = true
2566 * for the drop.
2567 */
2568 if (rstate->state != SUBREL_STATE_SYNCDONE)
2569 {
2570 char syncslotname[NAMEDATALEN] = {0};
2571
2573 sizeof(syncslotname));
2575 }
2576 }
2577
2579
2580 /*
2581 * If there is a slot associated with the subscription, then drop the
2582 * replication slot at the publisher.
2583 */
2584 if (slotname)
2585 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2586 }
2587 PG_FINALLY();
2588 {
2590 }
2591 PG_END_TRY();
2592
2593 table_close(rel, NoLock);
2594}
2595
2596/*
2597 * Drop the replication slot at the publisher node using the replication
2598 * connection.
2599 *
2600 * missing_ok - if true then only issue a LOG message if the slot doesn't
2601 * exist.
2602 */
2603void
2604ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2605{
2606 StringInfoData cmd;
2607
2608 Assert(wrconn);
2609
2610 load_file("libpqwalreceiver", false);
2611
2612 initStringInfo(&cmd);
2613 appendStringInfoString(&cmd, "DROP_REPLICATION_SLOT ");
2614 appendQuotedIdentifier(&cmd, slotname);
2615 appendStringInfoString(&cmd, " WAIT");
2616
2617 PG_TRY();
2618 {
2619 WalRcvExecResult *res;
2620
2621 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2622
2623 if (res->status == WALRCV_OK_COMMAND)
2624 {
2625 /* NOTICE. Success. */
2627 (errmsg("dropped replication slot \"%s\" on publisher",
2628 slotname)));
2629 }
2630 else if (res->status == WALRCV_ERROR &&
2631 missing_ok &&
2633 {
2634 /* LOG. Error, but missing_ok = true. */
2635 ereport(LOG,
2636 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2637 slotname, res->err)));
2638 }
2639 else
2640 {
2641 /* ERROR. */
2642 ereport(ERROR,
2644 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2645 slotname, res->err)));
2646 }
2647
2649 }
2650 PG_FINALLY();
2651 {
2652 pfree(cmd.data);
2653 }
2654 PG_END_TRY();
2655}
2656
2657/*
2658 * Internal workhorse for changing a subscription owner
2659 */
2660static void
2662{
2665
2667
2668 if (form->subowner == newOwnerId)
2669 return;
2670
2673 NameStr(form->subname));
2674
2675 /*
2676 * Don't allow non-superuser modification of a subscription with
2677 * password_required=false.
2678 */
2679 if (!form->subpasswordrequired && !superuser())
2680 ereport(ERROR,
2682 errmsg("password_required=false is superuser-only"),
2683 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2684
2685 /* Must be able to become new owner */
2687
2688 /*
2689 * current owner must have CREATE on database
2690 *
2691 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2692 * other object types behave differently (e.g. you can't give a table to a
2693 * user who lacks CREATE privileges on a schema).
2694 */
2697 if (aclresult != ACLCHECK_OK)
2700
2701 /*
2702 * If the subscription uses a server, check that the new owner has USAGE
2703 * privileges on the server and that a user mapping exists. Note: does not
2704 * re-check the resulting connection string.
2705 */
2706 if (OidIsValid(form->subserver))
2707 {
2708 ForeignServer *server = GetForeignServer(form->subserver);
2709
2711 if (aclresult != ACLCHECK_OK)
2712 ereport(ERROR,
2714 errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2716 server->servername));
2717
2718 /* make sure a user mapping exists */
2720 }
2721
2722 form->subowner = newOwnerId;
2723 CatalogTupleUpdate(rel, &tup->t_self, tup);
2724
2725 /* Update owner dependency reference */
2727 form->oid,
2728 newOwnerId);
2729
2731 form->oid, 0);
2732
2733 /* Wake up related background processes to handle this change quickly. */
2736}
2737
2738/*
2739 * Change subscription owner -- by name
2740 */
2743{
2744 Oid subid;
2745 HeapTuple tup;
2746 Relation rel;
2747 ObjectAddress address;
2749
2751
2754
2755 if (!HeapTupleIsValid(tup))
2756 ereport(ERROR,
2758 errmsg("subscription \"%s\" does not exist", name)));
2759
2761 subid = form->oid;
2762
2764
2766
2768
2770
2771 return address;
2772}
2773
2774/*
2775 * Change subscription owner -- by OID
2776 */
2777void
2779{
2780 HeapTuple tup;
2781 Relation rel;
2782
2784
2786
2787 if (!HeapTupleIsValid(tup))
2788 ereport(ERROR,
2790 errmsg("subscription with OID %u does not exist", subid)));
2791
2793
2795
2797}
2798
2799/*
2800 * Check and log a warning if the publisher has subscribed to the same table,
2801 * its partition ancestors (if it's a partition), or its partition children (if
2802 * it's a partitioned table), from some other publishers. This check is
2803 * required in the following scenarios:
2804 *
2805 * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2806 * statements with "copy_data = true" and "origin = none":
2807 * - Warn the user that data with an origin might have been copied.
2808 * - This check is skipped for tables already added, as incremental sync via
2809 * WAL allows origin tracking. The list of such tables is in
2810 * subrel_local_oids.
2811 *
2812 * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2813 * statements with "retain_dead_tuples = true" and "origin = any", and for
2814 * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
2815 * or when the publisher's status changes (e.g., due to a connection string
2816 * update):
2817 * - Warn the user that only conflict detection info for local changes on
2818 * the publisher is retained. Data from other origins may lack sufficient
2819 * details for reliable conflict detection.
2820 * - See comments atop worker.c for more details.
2821 */
2822static void
2824 bool copydata, bool retain_dead_tuples,
2825 char *origin, Oid *subrel_local_oids,
2826 int subrel_count, char *subname)
2827{
2828 WalRcvExecResult *res;
2829 StringInfoData cmd;
2830 TupleTableSlot *slot;
2831 Oid tableRow[1] = {TEXTOID};
2832 List *publist = NIL;
2833 int i;
2834 bool check_rdt;
2835 bool check_table_sync;
2836 bool origin_none = origin &&
2838
2839 /*
2840 * Enable retain_dead_tuples checks only when origin is set to 'any',
2841 * since with origin='none' only local changes are replicated to the
2842 * subscriber.
2843 */
2845
2846 /*
2847 * Enable table synchronization checks only when origin is 'none', to
2848 * ensure that data from other origins is not inadvertently copied.
2849 */
2851
2852 /* retain_dead_tuples and table sync checks occur separately */
2854
2855 /* Return if no checks are required */
2856 if (!check_rdt && !check_table_sync)
2857 return;
2858
2859 initStringInfo(&cmd);
2861 "SELECT DISTINCT P.pubname AS pubname\n"
2862 "FROM pg_publication P,\n"
2863 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2864 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2865 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2866 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2867 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2868 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2869 GetPublicationsStr(publications, &cmd, true);
2870 appendStringInfoString(&cmd, ")\n");
2871
2872 /*
2873 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2874 * subrel_local_oids contains the list of relation oids that are already
2875 * present on the subscriber. This check should be skipped for these
2876 * tables if checking for table sync scenario. However, when handling the
2877 * retain_dead_tuples scenario, ensure all tables are checked, as some
2878 * existing tables may now include changes from other origins due to newly
2879 * created subscriptions on the publisher.
2880 */
2881 if (check_table_sync)
2882 {
2883 for (i = 0; i < subrel_count; i++)
2884 {
2885 Oid relid = subrel_local_oids[i];
2886 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2887 char *tablename = get_rel_name(relid);
2888 char *schemaname_lit = quote_literal_cstr(schemaname);
2889 char *tablename_lit = quote_literal_cstr(tablename);
2890
2891 appendStringInfo(&cmd, "AND NOT (N.nspname = %s AND C.relname = %s)\n",
2893
2896 }
2897 }
2898
2899 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2900 pfree(cmd.data);
2901
2902 if (res->status != WALRCV_OK_TUPLES)
2903 ereport(ERROR,
2905 errmsg("could not receive list of replicated tables from the publisher: %s",
2906 res->err)));
2907
2908 /* Process publications. */
2910 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2911 {
2912 char *pubname;
2913 bool isnull;
2914
2915 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2916 Assert(!isnull);
2917
2918 ExecClearTuple(slot);
2920 }
2921
2922 /*
2923 * Log a warning if the publisher has subscribed to the same table from
2924 * some other publisher. We cannot know the origin of data during the
2925 * initial sync. Data origins can be found only from the WAL by looking at
2926 * the origin id.
2927 *
2928 * XXX: For simplicity, we don't check whether the table has any data or
2929 * not. If the table doesn't have any data then we don't need to
2930 * distinguish between data having origin and data not having origin so we
2931 * can avoid logging a warning for table sync scenario.
2932 */
2933 if (publist)
2934 {
2936
2937 /* Prepare the list of publication(s) for warning message. */
2940
2941 if (check_table_sync)
2944 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2945 subname),
2946 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2947 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2949 errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2950 else
2953 errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2954 subname),
2955 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2956 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2958 errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
2959 }
2960
2962
2964}
2965
2966/*
2967 * This function is similar to check_publications_origin_tables and serves
2968 * same purpose for sequences.
2969 */
2970static void
2972 bool copydata, char *origin,
2974 char *subname)
2975{
2976 WalRcvExecResult *res;
2977 StringInfoData cmd;
2978 TupleTableSlot *slot;
2979 Oid tableRow[1] = {TEXTOID};
2980 List *publist = NIL;
2981
2982 /*
2983 * Enable sequence synchronization checks only when origin is 'none' , to
2984 * ensure that sequence data from other origins is not inadvertently
2985 * copied. This check is necessary if the publisher is running PG19 or
2986 * later, where logical replication sequence synchronization is supported.
2987 */
2988 if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
2989 walrcv_server_version(wrconn) < 190000)
2990 return;
2991
2992 initStringInfo(&cmd);
2994 "SELECT DISTINCT P.pubname AS pubname\n"
2995 "FROM pg_publication P,\n"
2996 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2997 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2998 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2999 "WHERE C.oid = GPS.relid AND P.pubname IN (");
3000
3001 GetPublicationsStr(publications, &cmd, true);
3002 appendStringInfoString(&cmd, ")\n");
3003
3004 /*
3005 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
3006 * subrel_local_oids contains the list of relations that are already
3007 * present on the subscriber. This check should be skipped as these will
3008 * not be re-synced.
3009 */
3010 for (int i = 0; i < subrel_count; i++)
3011 {
3012 Oid relid = subrel_local_oids[i];
3013 char *schemaname = get_namespace_name(get_rel_namespace(relid));
3014 char *seqname = get_rel_name(relid);
3015 char *schemaname_lit = quote_literal_cstr(schemaname);
3016 char *seqname_lit = quote_literal_cstr(seqname);
3017
3018 appendStringInfo(&cmd,
3019 "AND NOT (N.nspname = %s AND C.relname = %s)\n",
3021
3024 }
3025
3026 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
3027 pfree(cmd.data);
3028
3029 if (res->status != WALRCV_OK_TUPLES)
3030 ereport(ERROR,
3032 errmsg("could not receive list of replicated sequences from the publisher: %s",
3033 res->err)));
3034
3035 /* Process publications. */
3037 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3038 {
3039 char *pubname;
3040 bool isnull;
3041
3042 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3043 Assert(!isnull);
3044
3045 ExecClearTuple(slot);
3047 }
3048
3049 /*
3050 * Log a warning if the publisher has subscribed to the same sequence from
3051 * some other publisher. We cannot know the origin of sequences data
3052 * during the initial sync.
3053 */
3054 if (publist)
3055 {
3057
3058 /* Prepare the list of publication(s) for warning message. */
3061
3064 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
3065 subname),
3066 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
3067 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
3069 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
3070 }
3071
3073
3075}
3076
3077/*
3078 * Determine whether the retain_dead_tuples can be enabled based on the
3079 * publisher's status.
3080 *
3081 * This option is disallowed if the publisher is running a version earlier
3082 * than the PG19, or if the publisher is in recovery (i.e., it is a standby
3083 * server).
3084 *
3085 * See comments atop worker.c for a detailed explanation.
3086 */
3087static void
3089{
3090 WalRcvExecResult *res;
3091 Oid RecoveryRow[1] = {BOOLOID};
3092 TupleTableSlot *slot;
3093 bool isnull;
3094 bool remote_in_recovery;
3095
3096 if (walrcv_server_version(wrconn) < 190000)
3097 ereport(ERROR,
3099 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
3100
3101 res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
3102
3103 if (res->status != WALRCV_OK_TUPLES)
3104 ereport(ERROR,
3106 errmsg("could not obtain recovery progress from the publisher: %s",
3107 res->err)));
3108
3110 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3111 elog(ERROR, "failed to fetch tuple for the recovery progress");
3112
3113 remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
3114
3116 ereport(ERROR,
3118 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
3119
3121
3123}
3124
3125/*
3126 * Check if the subscriber's configuration is adequate to enable the
3127 * retain_dead_tuples option.
3128 *
3129 * Issue an ERROR if the wal_level does not support the use of replication
3130 * slots when check_guc is set to true.
3131 *
3132 * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
3133 * set to true. This is only to highlight the importance of enabling
3134 * track_commit_timestamp instead of catching all the misconfigurations, as
3135 * this setting can be adjusted after subscription creation. Without it, the
3136 * apply worker will simply skip conflict detection.
3137 *
3138 * Issue a WARNING or NOTICE if the subscription is disabled and the retention
3139 * is active. Do not raise an ERROR since users can only modify
3140 * retain_dead_tuples for disabled subscriptions. And as long as the
3141 * subscription is enabled promptly, it will not pose issues.
3142 *
3143 * Issue a NOTICE to inform users that max_retention_duration is
3144 * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
3145 * is not issued because setting max_retention_duration causes no harm,
3146 * even when it is ineffective.
3147 */
3148void
3152 bool max_retention_set)
3153{
3156
3158 {
3160 ereport(ERROR,
3162 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3163 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3164
3168 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3169 errhint("Consider setting \"%s\" to true.",
3170 "track_commit_timestamp"));
3171
3175 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3177 ? errhint("Consider setting %s to false.",
3178 "retain_dead_tuples") : 0);
3179 }
3180 else if (max_retention_set)
3181 {
3184 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3185 }
3186}
3187
3188/*
3189 * Return true iff 'rv' is a member of the list.
3190 */
3191static bool
3193{
3195 {
3196 if (equal(relinfo->rv, rv))
3197 return true;
3198 }
3199
3200 return false;
3201}
3202
3203/*
3204 * Get the list of tables and sequences which belong to specified publications
3205 * on the publisher connection.
3206 *
3207 * Note that we don't support the case where the column list is different for
3208 * the same table in different publications to avoid sending unwanted column
3209 * information for some of the rows. This can happen when both the column
3210 * list and row filter are specified for different publications.
3211 */
3212static List *
3214{
3215 WalRcvExecResult *res;
3216 StringInfoData cmd;
3217 TupleTableSlot *slot;
3221 bool check_columnlist = (server_version >= 150000);
3222 int column_count = check_columnlist ? 4 : 3;
3223 StringInfoData pub_names;
3224
3225 initStringInfo(&cmd);
3226 initStringInfo(&pub_names);
3227
3228 /* Build the pub_names comma-separated string. */
3229 GetPublicationsStr(publications, &pub_names, true);
3230
3231 /* Get the list of relations from the publisher */
3232 if (server_version >= 160000)
3233 {
3235
3236 /*
3237 * From version 16, we allowed passing multiple publications to the
3238 * function pg_get_publication_tables. This helped to filter out the
3239 * partition table whose ancestor is also published in this
3240 * publication array.
3241 *
3242 * Join pg_get_publication_tables with pg_publication to exclude
3243 * non-existing publications.
3244 *
3245 * Note that attrs are always stored in sorted order so we don't need
3246 * to worry if different publications have specified them in a
3247 * different order. See pub_collist_validate.
3248 */
3249 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
3250 " FROM pg_class c\n"
3251 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
3252 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
3253 " FROM pg_publication\n"
3254 " WHERE pubname IN ( %s )) AS gpt\n"
3255 " ON gpt.relid = c.oid\n",
3256 pub_names.data);
3257
3258 /* From version 19, inclusion of sequences in the target is supported */
3259 if (server_version >= 190000)
3260 appendStringInfo(&cmd,
3261 "UNION ALL\n"
3262 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
3263 " FROM pg_catalog.pg_publication_sequences s\n"
3264 " WHERE s.pubname IN ( %s )",
3265 pub_names.data);
3266 }
3267 else
3268 {
3270 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
3271
3272 /* Get column lists for each relation if the publisher supports it */
3273 if (check_columnlist)
3274 appendStringInfoString(&cmd, ", t.attnames\n");
3275
3276 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
3277 " WHERE t.pubname IN ( %s )",
3278 pub_names.data);
3279 }
3280
3281 pfree(pub_names.data);
3282
3284 pfree(cmd.data);
3285
3286 if (res->status != WALRCV_OK_TUPLES)
3287 ereport(ERROR,
3289 errmsg("could not receive list of replicated tables from the publisher: %s",
3290 res->err)));
3291
3292 /* Process tables. */
3294 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3295 {
3296 char *nspname;
3297 char *relname;
3298 bool isnull;
3299 char relkind;
3301
3302 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3303 Assert(!isnull);
3304 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3305 Assert(!isnull);
3306 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
3307 Assert(!isnull);
3308
3309 relinfo->rv = makeRangeVar(nspname, relname, -1);
3310 relinfo->relkind = relkind;
3311
3312 if (relkind != RELKIND_SEQUENCE &&
3315 ereport(ERROR,
3317 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
3318 nspname, relname));
3319 else
3321
3322 ExecClearTuple(slot);
3323 }
3325
3327
3328 return relationlist;
3329}
3330
3331/*
3332 * This is to report the connection failure while dropping replication slots.
3333 * Here, we report the WARNING for all tablesync slots so that user can drop
3334 * them manually, if required.
3335 */
3336static void
3337ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
3338{
3339 ListCell *lc;
3340
3341 foreach(lc, rstates)
3342 {
3344 Oid relid = rstate->relid;
3345
3346 /* Only cleanup resources of tablesync workers */
3347 if (!OidIsValid(relid))
3348 continue;
3349
3350 /*
3351 * Caller needs to ensure that relstate doesn't change underneath us.
3352 * See DropSubscription where we get the relstates.
3353 */
3354 if (rstate->state != SUBREL_STATE_SYNCDONE)
3355 {
3356 char syncslotname[NAMEDATALEN] = {0};
3357
3359 sizeof(syncslotname));
3360 elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3361 syncslotname);
3362 }
3363 }
3364
3365 ereport(ERROR,
3367 errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3368 slotname, err),
3369 /* translator: %s is an SQL ALTER command */
3370 errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3371 "ALTER SUBSCRIPTION ... DISABLE",
3372 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3373}
3374
3375/*
3376 * Check for duplicates in the given list of publications and error out if
3377 * found one. Add publications to datums as text datums, if datums is not
3378 * NULL.
3379 */
3380static void
3382{
3383 ListCell *cell;
3384 int j = 0;
3385
3386 foreach(cell, publist)
3387 {
3388 char *name = strVal(lfirst(cell));
3389 ListCell *pcell;
3390
3391 foreach(pcell, publist)
3392 {
3393 char *pname = strVal(lfirst(pcell));
3394
3395 if (pcell == cell)
3396 break;
3397
3398 if (strcmp(name, pname) == 0)
3399 ereport(ERROR,
3401 errmsg("publication name \"%s\" used more than once",
3402 pname)));
3403 }
3404
3405 if (datums)
3406 datums[j++] = CStringGetTextDatum(name);
3407 }
3408}
3409
3410/*
3411 * Merge current subscription's publications and user-specified publications
3412 * from ADD/DROP PUBLICATIONS.
3413 *
3414 * If addpub is true, we will add the list of publications into oldpublist.
3415 * Otherwise, we will delete the list of publications from oldpublist. The
3416 * returned list is a copy, oldpublist itself is not changed.
3417 *
3418 * subname is the subscription name, for error messages.
3419 */
3420static List *
3422{
3423 ListCell *lc;
3424
3426
3428
3429 foreach(lc, newpublist)
3430 {
3431 char *name = strVal(lfirst(lc));
3432 ListCell *lc2;
3433 bool found = false;
3434
3435 foreach(lc2, oldpublist)
3436 {
3437 char *pubname = strVal(lfirst(lc2));
3438
3439 if (strcmp(name, pubname) == 0)
3440 {
3441 found = true;
3442 if (addpub)
3443 ereport(ERROR,
3445 errmsg("publication \"%s\" is already in subscription \"%s\"",
3446 name, subname)));
3447 else
3449
3450 break;
3451 }
3452 }
3453
3454 if (addpub && !found)
3456 else if (!addpub && !found)
3457 ereport(ERROR,
3459 errmsg("publication \"%s\" is not in subscription \"%s\"",
3460 name, subname)));
3461 }
3462
3463 /*
3464 * XXX Probably no strong reason for this, but for now it's to make ALTER
3465 * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3466 */
3467 if (!oldpublist)
3468 ereport(ERROR,
3470 errmsg("cannot drop all the publications from a subscription")));
3471
3472 return oldpublist;
3473}
3474
3475/*
3476 * Extract the streaming mode value from a DefElem. This is like
3477 * defGetBoolean() but also accepts the special value of "parallel".
3478 */
3479char
3481{
3482 /*
3483 * If no parameter value given, assume "true" is meant.
3484 */
3485 if (!def->arg)
3486 return LOGICALREP_STREAM_ON;
3487
3488 /*
3489 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3490 */
3491 switch (nodeTag(def->arg))
3492 {
3493 case T_Integer:
3494 switch (intVal(def->arg))
3495 {
3496 case 0:
3497 return LOGICALREP_STREAM_OFF;
3498 case 1:
3499 return LOGICALREP_STREAM_ON;
3500 default:
3501 /* otherwise, error out below */
3502 break;
3503 }
3504 break;
3505 default:
3506 {
3507 char *sval = defGetString(def);
3508
3509 /*
3510 * The set of strings accepted here should match up with the
3511 * grammar's opt_boolean_or_string production.
3512 */
3513 if (pg_strcasecmp(sval, "false") == 0 ||
3514 pg_strcasecmp(sval, "off") == 0)
3515 return LOGICALREP_STREAM_OFF;
3516 if (pg_strcasecmp(sval, "true") == 0 ||
3517 pg_strcasecmp(sval, "on") == 0)
3518 return LOGICALREP_STREAM_ON;
3519 if (pg_strcasecmp(sval, "parallel") == 0)
3521 }
3522 break;
3523 }
3524
3525 ereport(ERROR,
3527 errmsg("%s requires a Boolean value or \"parallel\"",
3528 def->defname)));
3529 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3530}
void check_can_set_role(Oid member, Oid role)
Definition acl.c:5371
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
@ ACLCHECK_NOT_OWNER
Definition acl.h:186
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3880
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4134
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6334
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:891
#define Assert(condition)
Definition c.h:999
#define CppAsString2(x)
Definition c.h:562
int32_t int32
Definition c.h:676
uint32_t uint32
Definition c.h:680
#define OidIsValid(objectId)
Definition c.h:914
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
bool track_commit_timestamp
Definition commit_ts.c:121
int32 defGetInt32(DefElem *def)
Definition define.c:148
char * defGetString(DefElem *def)
Definition define.c:34
bool defGetBoolean(DefElem *def)
Definition define.c:93
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition define.c:370
@ DEPENDENCY_NORMAL
Definition dependency.h:33
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode(int sqlerrcode)
Definition elog.c:875
#define _(x)
Definition elog.c:96
#define LOG
Definition elog.h:32
int errhint(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
#define PG_END_TRY(...)
Definition elog.h:399
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define NOTICE
Definition elog.h:36
#define PG_FINALLY(...)
Definition elog.h:391
#define ereport(elevel,...)
Definition elog.h:152
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition err.c:43
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
#define palloc_object(type)
Definition fe_memutils.h:89
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:688
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:185
char * ForeignServerConnectionString(Oid userid, ForeignServer *server)
Definition foreign.c:202
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:232
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
Oid MyDatabaseId
Definition globals.c:96
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition guc.c:3248
@ GUC_ACTION_SET
Definition guc.h:203
@ PGC_S_TEST
Definition guc.h:125
@ PGC_BACKEND
Definition guc.h:77
const char * str
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
long val
Definition informix.c:689
int j
Definition isn.c:78
int i
Definition isn.c:77
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1185
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:662
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_append_unique(List *list, void *datum)
Definition list.c:1343
List * list_copy(const List *oldlist)
Definition list.c:1573
void list_free(List *list)
Definition list.c:1546
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_rel_name(Oid relid)
Definition lsyscache.c:2234
char * get_database_name(Oid dbid)
Definition lsyscache.c:1384
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2309
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2258
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3674
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition makefuncs.c:473
char * pstrdup(const char *in)
Definition mcxt.c:1910
void pfree(void *pointer)
Definition mcxt.c:1619
void * palloc(Size size)
Definition mcxt.c:1390
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
#define nodeTag(nodeptr)
Definition nodes.h:137
static char * errmsg
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
Definition oid.c:287
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SERVER
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
#define ACL_USAGE
Definition parsenodes.h:84
@ OBJECT_DATABASE
@ OBJECT_FOREIGN_SERVER
@ OBJECT_SUBSCRIPTION
#define ACL_CREATE
Definition parsenodes.h:85
static AmcheckOptions opts
Definition pg_amcheck.c:112
NameData relname
Definition pg_class.h:40
#define NAMEDATALEN
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:51
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
Definition pg_depend.c:314
long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, Oid refclassId, Oid refobjectId)
Definition pg_depend.c:411
static int server_version
Definition pg_dumpall.c:109
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:423
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition pg_lsn.c:64
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static XLogRecPtr DatumGetLSN(Datum X)
Definition pg_lsn.h:25
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
Subscription * GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed, bool conninfo_aclcheck)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
NameData subname
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
static char buf[DEFAULT_XLOG_SEG_SIZE]
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
Definition port.h:496
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static Name DatumGetName(Datum X)
Definition postgres.h:393
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static char DatumGetChar(Datum X)
Definition postgres.h:122
static Datum CStringGetDatum(const char *X)
Definition postgres.h:383
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
char * c
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
char * quote_literal_cstr(const char *rawstr)
Definition quote.c:101
#define RelationGetDescr(relation)
Definition rel.h:542
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:265
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
char * defname
Definition parsenodes.h:862
Node * arg
Definition parsenodes.h:863
char * servername
Definition foreign.h:40
Definition pg_list.h:54
LogicalRepWorkerType type
char * relname
Definition primnodes.h:84
char * schemaname
Definition primnodes.h:81
uint32 specified_opts
bool retaindeadtuples
char * wal_receiver_timeout
char * synchronous_commit
int32 maxretention
char * slot_name
XLogRecPtr lsn
bool passwordrequired
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_STREAMING
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
static char * construct_subserver_conninfo(Oid subserver, Oid subowner, char **err)
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void parse_subscription_options(ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
#define appendQuotedIdentifier(b, s)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static bool list_member_rangevar(const List *list, RangeVar *rv)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
static void appendQuotedString(StringInfo buf, const char *str, char quote)
#define SUBOPT_CONNECT
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
Definition superuser.c:57
bool superuser(void)
Definition superuser.c:47
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:231
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:626
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1236
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1681
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:417
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2803
String * makeString(char *str)
Definition value.c:63
#define intVal(v)
Definition value.h:79
#define strVal(v)
Definition value.h:82
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:95
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR
@ WALRCV_OK_TUPLES
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23
@ WORKERTYPE_TABLESYNC
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3701
int wal_level
Definition xlog.c:138
@ WAL_LEVEL_REPLICA
Definition xlog.h:77
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28