PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * subscriptioncmds.c
4 * subscription catalog manipulation functions
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/subscriptioncmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/htup_details.h"
19#include "access/table.h"
20#include "access/twophase.h"
21#include "access/xact.h"
22#include "catalog/catalog.h"
23#include "catalog/dependency.h"
24#include "catalog/indexing.h"
25#include "catalog/namespace.h"
28#include "catalog/pg_authid_d.h"
29#include "catalog/pg_database_d.h"
33#include "catalog/pg_type.h"
35#include "commands/defrem.h"
38#include "executor/executor.h"
39#include "foreign/foreign.h"
40#include "miscadmin.h"
41#include "nodes/makefuncs.h"
42#include "pgstat.h"
45#include "replication/origin.h"
46#include "replication/slot.h"
50#include "storage/lmgr.h"
51#include "utils/acl.h"
52#include "utils/builtins.h"
53#include "utils/guc.h"
54#include "utils/lsyscache.h"
55#include "utils/memutils.h"
56#include "utils/pg_lsn.h"
57#include "utils/syscache.h"
58
59/*
60 * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
61 * command.
62 */
63#define SUBOPT_CONNECT 0x00000001
64#define SUBOPT_ENABLED 0x00000002
65#define SUBOPT_CREATE_SLOT 0x00000004
66#define SUBOPT_SLOT_NAME 0x00000008
67#define SUBOPT_COPY_DATA 0x00000010
68#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
69#define SUBOPT_REFRESH 0x00000040
70#define SUBOPT_BINARY 0x00000080
71#define SUBOPT_STREAMING 0x00000100
72#define SUBOPT_TWOPHASE_COMMIT 0x00000200
73#define SUBOPT_DISABLE_ON_ERR 0x00000400
74#define SUBOPT_PASSWORD_REQUIRED 0x00000800
75#define SUBOPT_RUN_AS_OWNER 0x00001000
76#define SUBOPT_FAILOVER 0x00002000
77#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
78#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
79#define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
80#define SUBOPT_LSN 0x00020000
81#define SUBOPT_ORIGIN 0x00040000
82
83/* check if the 'val' has 'bits' set */
84#define IsSet(val, bits) (((val) & (bits)) == (bits))
85
86/*
87 * Structure to hold a bitmap representing the user-provided CREATE/ALTER
88 * SUBSCRIPTION command options and the parsed/default values of each of them.
89 */
113
114/*
115 * PublicationRelKind represents a relation included in a publication.
116 * It stores the schema-qualified relation name (rv) and its kind (relkind).
117 */
123
124static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
126 List *publications, bool copydata,
128 char *origin,
130 int subrel_count, char *subname);
132 List *publications,
133 bool copydata, char *origin,
135 int subrel_count,
136 char *subname);
138static void check_duplicates_in_publist(List *publist, Datum *datums);
139static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
140static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
141static void CheckAlterSubOption(Subscription *sub, const char *option,
142 bool slot_needs_update, bool isTopLevel);
143
144
145/*
146 * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
147 *
148 * Since not all options can be specified in both commands, this function
149 * will report an error if mutually exclusive options are specified.
150 */
151static void
154{
155 ListCell *lc;
156
157 /* Start out with cleared opts. */
158 memset(opts, 0, sizeof(SubOpts));
159
160 /* caller must expect some option */
162
163 /* If connect option is supported, these others also need to be. */
167
168 /* Set default values for the supported options. */
170 opts->connect = true;
172 opts->enabled = true;
174 opts->create_slot = true;
176 opts->copy_data = true;
178 opts->refresh = true;
180 opts->binary = false;
182 opts->streaming = LOGICALREP_STREAM_PARALLEL;
184 opts->twophase = false;
186 opts->disableonerr = false;
188 opts->passwordrequired = true;
190 opts->runasowner = false;
192 opts->failover = false;
194 opts->retaindeadtuples = false;
196 opts->maxretention = 0;
199
200 /* Parse options */
201 foreach(lc, stmt_options)
202 {
203 DefElem *defel = (DefElem *) lfirst(lc);
204
206 strcmp(defel->defname, "connect") == 0)
207 {
208 if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
210
211 opts->specified_opts |= SUBOPT_CONNECT;
212 opts->connect = defGetBoolean(defel);
213 }
215 strcmp(defel->defname, "enabled") == 0)
216 {
217 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
219
220 opts->specified_opts |= SUBOPT_ENABLED;
221 opts->enabled = defGetBoolean(defel);
222 }
224 strcmp(defel->defname, "create_slot") == 0)
225 {
226 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
228
229 opts->specified_opts |= SUBOPT_CREATE_SLOT;
230 opts->create_slot = defGetBoolean(defel);
231 }
233 strcmp(defel->defname, "slot_name") == 0)
234 {
235 if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
237
238 opts->specified_opts |= SUBOPT_SLOT_NAME;
239 opts->slot_name = defGetString(defel);
240
241 /* Setting slot_name = NONE is treated as no slot name. */
242 if (strcmp(opts->slot_name, "none") == 0)
243 opts->slot_name = NULL;
244 else
245 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
246 }
248 strcmp(defel->defname, "copy_data") == 0)
249 {
250 if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
252
253 opts->specified_opts |= SUBOPT_COPY_DATA;
254 opts->copy_data = defGetBoolean(defel);
255 }
257 strcmp(defel->defname, "synchronous_commit") == 0)
258 {
259 if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
261
262 opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
263 opts->synchronous_commit = defGetString(defel);
264
265 /* Test if the given value is valid for synchronous_commit GUC. */
266 (void) set_config_option("synchronous_commit", opts->synchronous_commit,
268 false, 0, false);
269 }
271 strcmp(defel->defname, "refresh") == 0)
272 {
273 if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
275
276 opts->specified_opts |= SUBOPT_REFRESH;
277 opts->refresh = defGetBoolean(defel);
278 }
280 strcmp(defel->defname, "binary") == 0)
281 {
282 if (IsSet(opts->specified_opts, SUBOPT_BINARY))
284
285 opts->specified_opts |= SUBOPT_BINARY;
286 opts->binary = defGetBoolean(defel);
287 }
289 strcmp(defel->defname, "streaming") == 0)
290 {
291 if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
293
294 opts->specified_opts |= SUBOPT_STREAMING;
295 opts->streaming = defGetStreamingMode(defel);
296 }
298 strcmp(defel->defname, "two_phase") == 0)
299 {
300 if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
302
303 opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
304 opts->twophase = defGetBoolean(defel);
305 }
307 strcmp(defel->defname, "disable_on_error") == 0)
308 {
309 if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
311
312 opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
313 opts->disableonerr = defGetBoolean(defel);
314 }
316 strcmp(defel->defname, "password_required") == 0)
317 {
318 if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
320
321 opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
322 opts->passwordrequired = defGetBoolean(defel);
323 }
325 strcmp(defel->defname, "run_as_owner") == 0)
326 {
327 if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
329
330 opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
331 opts->runasowner = defGetBoolean(defel);
332 }
334 strcmp(defel->defname, "failover") == 0)
335 {
336 if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
338
339 opts->specified_opts |= SUBOPT_FAILOVER;
340 opts->failover = defGetBoolean(defel);
341 }
343 strcmp(defel->defname, "retain_dead_tuples") == 0)
344 {
345 if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
347
348 opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
349 opts->retaindeadtuples = defGetBoolean(defel);
350 }
352 strcmp(defel->defname, "max_retention_duration") == 0)
353 {
354 if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
356
357 opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
358 opts->maxretention = defGetInt32(defel);
359 }
361 strcmp(defel->defname, "origin") == 0)
362 {
363 if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
365
366 opts->specified_opts |= SUBOPT_ORIGIN;
367 pfree(opts->origin);
368
369 /*
370 * Even though the "origin" parameter allows only "none" and "any"
371 * values, it is implemented as a string type so that the
372 * parameter can be extended in future versions to support
373 * filtering using origin names specified by the user.
374 */
375 opts->origin = defGetString(defel);
376
377 if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
381 errmsg("unrecognized origin value: \"%s\"", opts->origin));
382 }
383 else if (IsSet(supported_opts, SUBOPT_LSN) &&
384 strcmp(defel->defname, "lsn") == 0)
385 {
386 char *lsn_str = defGetString(defel);
387 XLogRecPtr lsn;
388
389 if (IsSet(opts->specified_opts, SUBOPT_LSN))
391
392 /* Setting lsn = NONE is treated as resetting LSN */
393 if (strcmp(lsn_str, "none") == 0)
394 lsn = InvalidXLogRecPtr;
395 else
396 {
397 /* Parse the argument as LSN */
400
401 if (!XLogRecPtrIsValid(lsn))
404 errmsg("invalid WAL location (LSN): %s", lsn_str)));
405 }
406
407 opts->specified_opts |= SUBOPT_LSN;
408 opts->lsn = lsn;
409 }
411 strcmp(defel->defname, "wal_receiver_timeout") == 0)
412 {
413 bool parsed;
414 int val;
415
416 if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
418
419 opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
420 opts->wal_receiver_timeout = defGetString(defel);
421
422 /*
423 * Test if the given value is valid for wal_receiver_timeout GUC.
424 * Skip this test if the value is -1, since -1 is allowed for the
425 * wal_receiver_timeout subscription option, but not for the GUC
426 * itself.
427 */
428 parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
429 if (!parsed || val != -1)
430 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
432 false, 0, false);
433 }
434 else
437 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
438 }
439
440 /*
441 * We've been explicitly asked to not connect, that requires some
442 * additional processing.
443 */
444 if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
445 {
446 /* Check for incompatible options from the user. */
447 if (opts->enabled &&
448 IsSet(opts->specified_opts, SUBOPT_ENABLED))
451 /*- translator: both %s are strings of the form "option = value" */
452 errmsg("%s and %s are mutually exclusive options",
453 "connect = false", "enabled = true")));
454
455 if (opts->create_slot &&
456 IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
459 errmsg("%s and %s are mutually exclusive options",
460 "connect = false", "create_slot = true")));
461
462 if (opts->copy_data &&
463 IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
466 errmsg("%s and %s are mutually exclusive options",
467 "connect = false", "copy_data = true")));
468
469 /* Change the defaults of other options. */
470 opts->enabled = false;
471 opts->create_slot = false;
472 opts->copy_data = false;
473 }
474
475 /*
476 * Do additional checking for disallowed combination when slot_name = NONE
477 * was used.
478 */
479 if (!opts->slot_name &&
480 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
481 {
482 if (opts->enabled)
483 {
484 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
487 /*- translator: both %s are strings of the form "option = value" */
488 errmsg("%s and %s are mutually exclusive options",
489 "slot_name = NONE", "enabled = true")));
490 else
493 /*- translator: both %s are strings of the form "option = value" */
494 errmsg("subscription with %s must also set %s",
495 "slot_name = NONE", "enabled = false")));
496 }
497
498 if (opts->create_slot)
499 {
500 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
503 /*- translator: both %s are strings of the form "option = value" */
504 errmsg("%s and %s are mutually exclusive options",
505 "slot_name = NONE", "create_slot = true")));
506 else
509 /*- translator: both %s are strings of the form "option = value" */
510 errmsg("subscription with %s must also set %s",
511 "slot_name = NONE", "create_slot = false")));
512 }
513 }
514}
515
516/*
517 * Check that the specified publications are present on the publisher.
518 */
519static void
521{
522 WalRcvExecResult *res;
523 StringInfoData cmd;
524 TupleTableSlot *slot;
526 Oid tableRow[1] = {TEXTOID};
527
528 initStringInfo(&cmd);
529 appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
530 " pg_catalog.pg_publication t WHERE\n"
531 " t.pubname IN (");
532 GetPublicationsStr(publications, &cmd, true);
533 appendStringInfoChar(&cmd, ')');
534
535 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
536 pfree(cmd.data);
537
538 if (res->status != WALRCV_OK_TUPLES)
540 errmsg("could not receive list of publications from the publisher: %s",
541 res->err));
542
543 publicationsCopy = list_copy(publications);
544
545 /* Process publication(s). */
547 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
548 {
549 char *pubname;
550 bool isnull;
551
552 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
553 Assert(!isnull);
554
555 /* Delete the publication present in publisher from the list. */
557 ExecClearTuple(slot);
558 }
559
561
563
565 {
566 /* Prepare the list of non-existent publication(s) for error message. */
568
570
574 errmsg_plural("publication %s does not exist on the publisher",
575 "publications %s do not exist on the publisher",
577 pubnames.data));
578 }
579}
580
581/*
582 * Auxiliary function to build a text array out of a list of String nodes.
583 */
584static Datum
586{
587 ArrayType *arr;
588 Datum *datums;
589 MemoryContext memcxt;
591
592 /* Create memory context for temporary allocations. */
594 "publicationListToArray to array",
597
599
601
603
605
606 MemoryContextDelete(memcxt);
607
608 return PointerGetDatum(arr);
609}
610
611/*
612 * Create new subscription.
613 */
616 bool isTopLevel)
617{
618 Relation rel;
620 Oid subid;
621 bool nulls[Natts_pg_subscription];
623 Oid owner = GetUserId();
625 Oid serverid;
626 char *conninfo;
628 List *publications;
630 SubOpts opts = {0};
632
633 /*
634 * Parse and check options.
635 *
636 * Connection and publication should not be specified here.
637 */
648
649 /*
650 * Since creating a replication slot is not transactional, rolling back
651 * the transaction leaves the created replication slot. So we cannot run
652 * CREATE SUBSCRIPTION inside a transaction block if creating a
653 * replication slot.
654 */
655 if (opts.create_slot)
656 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
657
658 /*
659 * We don't want to allow unprivileged users to be able to trigger
660 * attempts to access arbitrary network destinations, so require the user
661 * to have been specifically authorized to create subscriptions.
662 */
666 errmsg("permission denied to create subscription"),
667 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
668 "pg_create_subscription")));
669
670 /*
671 * Since a subscription is a database object, we also check for CREATE
672 * permission on the database.
673 */
675 owner, ACL_CREATE);
676 if (aclresult != ACLCHECK_OK)
679
680 /*
681 * Non-superusers are required to set a password for authentication, and
682 * that password must be used by the target server, but the superuser can
683 * exempt a subscription from this requirement.
684 */
685 if (!opts.passwordrequired && !superuser_arg(owner))
688 errmsg("password_required=false is superuser-only"),
689 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
690
691 /*
692 * If built with appropriate switch, whine when regression-testing
693 * conventions for subscription names are violated.
694 */
695#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
696 if (strncmp(stmt->subname, "regress_", 8) != 0)
697 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
698#endif
699
701
702 /* Check if name is used */
705 if (OidIsValid(subid))
706 {
709 errmsg("subscription \"%s\" already exists",
710 stmt->subname)));
711 }
712
713 /*
714 * Ensure that system configuration parameters are set appropriately to
715 * support retain_dead_tuples and max_retention_duration.
716 */
718 opts.retaindeadtuples, opts.retaindeadtuples,
719 (opts.maxretention > 0));
720
721 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
722 opts.slot_name == NULL)
723 opts.slot_name = stmt->subname;
724
725 /* The default for synchronous_commit of subscriptions is off. */
726 if (opts.synchronous_commit == NULL)
727 opts.synchronous_commit = "off";
728
729 /*
730 * The default for wal_receiver_timeout of subscriptions is -1, which
731 * means the value is inherited from the server configuration, command
732 * line, or role/database settings.
733 */
734 if (opts.wal_receiver_timeout == NULL)
735 opts.wal_receiver_timeout = "-1";
736
737 /* Load the library providing us libpq calls. */
738 load_file("libpqwalreceiver", false);
739
740 if (stmt->servername)
741 {
742 ForeignServer *server;
743
744 Assert(!stmt->conninfo);
745 conninfo = NULL;
746
747 server = GetForeignServerByName(stmt->servername, false);
749 if (aclresult != ACLCHECK_OK)
751
752 /* make sure a user mapping exists */
753 GetUserMapping(owner, server->serverid);
754
755 serverid = server->serverid;
756 conninfo = ForeignServerConnectionString(owner, server);
757 }
758 else
759 {
760 Assert(stmt->conninfo);
761
762 serverid = InvalidOid;
763 conninfo = stmt->conninfo;
764 }
765
766 /* Check the connection info string. */
767 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
768
769 publications = stmt->publication;
770
771 /* Everything ok, form a new tuple. */
772 memset(values, 0, sizeof(values));
773 memset(nulls, false, sizeof(nulls));
774
787 CharGetDatum(opts.twophase ?
795 BoolGetDatum(opts.retaindeadtuples);
797 Int32GetDatum(opts.maxretention);
799 Int32GetDatum(opts.retaindeadtuples);
801 if (!OidIsValid(serverid))
803 CStringGetTextDatum(conninfo);
804 else
805 nulls[Anum_pg_subscription_subconninfo - 1] = true;
806 if (opts.slot_name)
809 else
810 nulls[Anum_pg_subscription_subslotname - 1] = true;
812 CStringGetTextDatum(opts.synchronous_commit);
814 CStringGetTextDatum(opts.wal_receiver_timeout);
816 publicationListToArray(publications);
819
821
822 /* Insert tuple into catalog. */
825
827
829
830 if (stmt->servername)
831 {
833
834 Assert(OidIsValid(serverid));
835
838 }
839
840 /*
841 * A replication origin is currently created for all subscriptions,
842 * including those that only contain sequences or are otherwise empty.
843 *
844 * XXX: While this is technically unnecessary, optimizing it would require
845 * additional logic to skip origin creation during DDL operations and
846 * apply workers initialization, and to handle origin creation dynamically
847 * when tables are added to the subscription. It is not clear whether
848 * preventing creation of origins is worth additional complexity.
849 */
852
853 /*
854 * Connect to remote side to execute requested commands and fetch table
855 * and sequence info.
856 */
857 if (opts.connect)
858 {
859 char *err;
862
863 /* Try to connect to the publisher. */
864 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
865 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
866 stmt->subname, &err);
867 if (!wrconn)
870 errmsg("subscription \"%s\" could not connect to the publisher: %s",
871 stmt->subname, err)));
872
873 PG_TRY();
874 {
875 bool has_tables = false;
876 List *pubrels;
877 char relation_state;
878
879 check_publications(wrconn, publications);
881 opts.copy_data,
882 opts.retaindeadtuples, opts.origin,
883 NULL, 0, stmt->subname);
885 opts.copy_data, opts.origin,
886 NULL, 0, stmt->subname);
887
888 if (opts.retaindeadtuples)
890
891 /*
892 * Set sync state based on if we were asked to do data copy or
893 * not.
894 */
896
897 /*
898 * Build local relation status info. Relations are for both tables
899 * and sequences from the publisher.
900 */
901 pubrels = fetch_relation_list(wrconn, publications);
902
904 {
905 Oid relid;
906 char relkind;
907 RangeVar *rv = pubrelinfo->rv;
908
909 relid = RangeVarGetRelid(rv, AccessShareLock, false);
910 relkind = get_rel_relkind(relid);
911
912 /* Check for supported relkind. */
913 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
914 rv->schemaname, rv->relname);
915 has_tables |= (relkind != RELKIND_SEQUENCE);
917 InvalidXLogRecPtr, true);
918 }
919
920 /*
921 * If requested, create permanent slot for the subscription. We
922 * won't use the initial snapshot for anything, so no need to
923 * export it.
924 *
925 * XXX: Similar to origins, it is not clear whether preventing the
926 * slot creation for empty and sequence-only subscriptions is
927 * worth additional complexity.
928 */
929 if (opts.create_slot)
930 {
931 bool twophase_enabled = false;
932
933 Assert(opts.slot_name);
934
935 /*
936 * Even if two_phase is set, don't create the slot with
937 * two-phase enabled. Will enable it once all the tables are
938 * synced and ready. This avoids race-conditions like prepared
939 * transactions being skipped due to changes not being applied
940 * due to checks in should_apply_changes_for_rel() when
941 * tablesync for the corresponding tables are in progress. See
942 * comments atop worker.c.
943 *
944 * Note that if tables were specified but copy_data is false
945 * then it is safe to enable two_phase up-front because those
946 * tables are already initially in READY state. When the
947 * subscription has no tables, we leave the twophase state as
948 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
949 * PUBLICATION to work.
950 */
951 if (opts.twophase && !opts.copy_data && has_tables)
952 twophase_enabled = true;
953
955 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
956
959
961 (errmsg("created replication slot \"%s\" on publisher",
962 opts.slot_name)));
963 }
964 }
965 PG_FINALLY();
966 {
968 }
969 PG_END_TRY();
970 }
971 else
973 (errmsg("subscription was created, but is not connected"),
974 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
975
977
979
980 /*
981 * Notify the launcher to start the apply worker if the subscription is
982 * enabled, or to create the conflict detection slot if retain_dead_tuples
983 * is enabled.
984 *
985 * Creating the conflict detection slot is essential even when the
986 * subscription is not enabled. This ensures that dead tuples are
987 * retained, which is necessary for accurately identifying the type of
988 * conflict during replication.
989 */
990 if (opts.enabled || opts.retaindeadtuples)
992
994
995 return myself;
996}
997
998static void
1001{
1002 char *err;
1003 List *pubrels = NIL;
1009 int subrel_count;
1010 ListCell *lc;
1011 int off;
1012 int tbl_count = 0;
1013 int seq_count = 0;
1014 Relation rel = NULL;
1015 typedef struct SubRemoveRels
1016 {
1017 Oid relid;
1018 char state;
1019 } SubRemoveRels;
1020
1022 bool must_use_password;
1023
1024 /* Load the library providing us libpq calls. */
1025 load_file("libpqwalreceiver", false);
1026
1027 /* Try to connect to the publisher. */
1029 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1030 sub->name, &err);
1031 if (!wrconn)
1032 ereport(ERROR,
1034 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1035 sub->name, err)));
1036
1037 PG_TRY();
1038 {
1041
1042 /* Get the relation list from publisher. */
1044
1045 /* Get local relation list. */
1046 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
1048
1049 /*
1050 * Build qsorted arrays of local table oids and sequence oids for
1051 * faster lookup. This can potentially contain all tables and
1052 * sequences in the database so speed of lookup is important.
1053 *
1054 * We do not yet know the exact count of tables and sequences, so we
1055 * allocate separate arrays for table OIDs and sequence OIDs based on
1056 * the total number of relations (subrel_count).
1057 */
1060 foreach(lc, subrel_states)
1061 {
1063
1064 if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
1065 subseq_local_oids[seq_count++] = relstate->relid;
1066 else
1067 subrel_local_oids[tbl_count++] = relstate->relid;
1068 }
1069
1072 sub->retaindeadtuples, sub->origin,
1074 sub->name);
1075
1078 copy_data, sub->origin,
1080 sub->name);
1081
1082 /*
1083 * Walk over the remote relations and try to match them to locally
1084 * known relations. If the relation is not known locally create a new
1085 * state for it.
1086 *
1087 * Also builds array of local oids of remote relations for the next
1088 * step.
1089 */
1090 off = 0;
1092
1094 {
1095 RangeVar *rv = pubrelinfo->rv;
1096 Oid relid;
1097 char relkind;
1098
1099 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1100 relkind = get_rel_relkind(relid);
1101
1102 /* Check for supported relkind. */
1103 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1104 rv->schemaname, rv->relname);
1105
1106 pubrel_local_oids[off++] = relid;
1107
1108 if (!bsearch(&relid, subrel_local_oids,
1109 tbl_count, sizeof(Oid), oid_cmp) &&
1110 !bsearch(&relid, subseq_local_oids,
1111 seq_count, sizeof(Oid), oid_cmp))
1112 {
1113 AddSubscriptionRelState(sub->oid, relid,
1115 InvalidXLogRecPtr, true);
1117 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1118 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1119 rv->schemaname, rv->relname, sub->name));
1120 }
1121 }
1122
1123 /*
1124 * Next remove state for tables we should not care about anymore using
1125 * the data we collected above
1126 */
1128
1129 for (off = 0; off < tbl_count; off++)
1130 {
1131 Oid relid = subrel_local_oids[off];
1132
1133 if (!bsearch(&relid, pubrel_local_oids,
1134 list_length(pubrels), sizeof(Oid), oid_cmp))
1135 {
1136 char state;
1137 XLogRecPtr statelsn;
1139
1140 /*
1141 * Lock pg_subscription_rel with AccessExclusiveLock to
1142 * prevent any race conditions with the apply worker
1143 * re-launching workers at the same time this code is trying
1144 * to remove those tables.
1145 *
1146 * Even if new worker for this particular rel is restarted it
1147 * won't be able to make any progress as we hold exclusive
1148 * lock on pg_subscription_rel till the transaction end. It
1149 * will simply exit as there is no corresponding rel entry.
1150 *
1151 * This locking also ensures that the state of rels won't
1152 * change till we are done with this refresh operation.
1153 */
1154 if (!rel)
1156
1157 /* Last known rel state. */
1158 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1159
1160 RemoveSubscriptionRel(sub->oid, relid);
1161
1162 remove_rel->relid = relid;
1163 remove_rel->state = state;
1164
1166
1168
1169 /*
1170 * For READY state, we would have already dropped the
1171 * tablesync origin.
1172 */
1174 {
1175 char originname[NAMEDATALEN];
1176
1177 /*
1178 * Drop the tablesync's origin tracking if exists.
1179 *
1180 * It is possible that the origin is not yet created for
1181 * tablesync worker, this can happen for the states before
1182 * SUBREL_STATE_DATASYNC. The tablesync worker or apply
1183 * worker can also concurrently try to drop the origin and
1184 * by this time the origin might be already removed. For
1185 * these reasons, passing missing_ok = true.
1186 */
1188 sizeof(originname));
1189 replorigin_drop_by_name(originname, true, false);
1190 }
1191
1193 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1195 get_rel_name(relid),
1196 sub->name)));
1197 }
1198 }
1199
1200 /*
1201 * Drop the tablesync slots associated with removed tables. This has
1202 * to be at the end because otherwise if there is an error while doing
1203 * the database operations we won't be able to rollback dropped slots.
1204 */
1206 {
1207 if (sub_remove_rel->state != SUBREL_STATE_READY &&
1209 {
1210 char syncslotname[NAMEDATALEN] = {0};
1211
1212 /*
1213 * For READY/SYNCDONE states we know the tablesync slot has
1214 * already been dropped by the tablesync worker.
1215 *
1216 * For other states, there is no certainty, maybe the slot
1217 * does not exist yet. Also, if we fail after removing some of
1218 * the slots, next time, it will again try to drop already
1219 * dropped slots and fail. For these reasons, we allow
1220 * missing_ok = true for the drop.
1221 */
1223 syncslotname, sizeof(syncslotname));
1225 }
1226 }
1227
1228 /*
1229 * Next remove state for sequences we should not care about anymore
1230 * using the data we collected above
1231 */
1232 for (off = 0; off < seq_count; off++)
1233 {
1234 Oid relid = subseq_local_oids[off];
1235
1236 if (!bsearch(&relid, pubrel_local_oids,
1237 list_length(pubrels), sizeof(Oid), oid_cmp))
1238 {
1239 /*
1240 * This locking ensures that the state of rels won't change
1241 * till we are done with this refresh operation.
1242 */
1243 if (!rel)
1245
1246 RemoveSubscriptionRel(sub->oid, relid);
1247
1249 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1251 get_rel_name(relid),
1252 sub->name));
1253 }
1254 }
1255 }
1256 PG_FINALLY();
1257 {
1259 }
1260 PG_END_TRY();
1261
1262 if (rel)
1263 table_close(rel, NoLock);
1264}
1265
1266/*
1267 * Marks all sequences with INIT state.
1268 */
1269static void
1271{
1272 char *err = NULL;
1274 bool must_use_password;
1275
1276 /* Load the library providing us libpq calls. */
1277 load_file("libpqwalreceiver", false);
1278
1279 /* Try to connect to the publisher. */
1281 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1282 sub->name, &err);
1283 if (!wrconn)
1284 ereport(ERROR,
1286 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1287 sub->name, err));
1288
1289 PG_TRY();
1290 {
1292
1294 sub->origin, NULL, 0, sub->name);
1295
1296 /* Get local sequence list. */
1297 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1299 {
1300 Oid relid = subrel->relid;
1301
1303 InvalidXLogRecPtr, false);
1305 errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1307 get_rel_name(relid),
1308 sub->name));
1309 }
1310 }
1311 PG_FINALLY();
1312 {
1314 }
1315 PG_END_TRY();
1316}
1317
1318/*
1319 * Common checks for altering failover, two_phase, and retain_dead_tuples
1320 * options.
1321 */
1322static void
1324 bool slot_needs_update, bool isTopLevel)
1325{
1326 Assert(strcmp(option, "failover") == 0 ||
1327 strcmp(option, "two_phase") == 0 ||
1328 strcmp(option, "retain_dead_tuples") == 0);
1329
1330 /*
1331 * Altering the retain_dead_tuples option does not update the slot on the
1332 * publisher.
1333 */
1334 Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1335
1336 /*
1337 * Do not allow changing the option if the subscription is enabled. This
1338 * is because both failover and two_phase options of the slot on the
1339 * publisher cannot be modified if the slot is currently acquired by the
1340 * existing walsender.
1341 *
1342 * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1343 * the publisher by the existing walsender, so we could have allowed that
1344 * even when the subscription is enabled. But we kept this restriction for
1345 * the sake of consistency and simplicity.
1346 *
1347 * Additionally, do not allow changing the retain_dead_tuples option when
1348 * the subscription is enabled to prevent race conditions arising from the
1349 * new option value being acknowledged asynchronously by the launcher and
1350 * apply workers.
1351 *
1352 * Without the restriction, a race condition may arise when a user
1353 * disables and immediately re-enables the retain_dead_tuples option. In
1354 * this case, the launcher might drop the slot upon noticing the disabled
1355 * action, while the apply worker may keep maintaining
1356 * oldest_nonremovable_xid without noticing the option change. During this
1357 * period, a transaction ID wraparound could falsely make this ID appear
1358 * as if it originates from the future w.r.t the transaction ID stored in
1359 * the slot maintained by launcher.
1360 *
1361 * Similarly, if the user enables retain_dead_tuples concurrently with the
1362 * launcher starting the worker, the apply worker may start calculating
1363 * oldest_nonremovable_xid before the launcher notices the enable action.
1364 * Consequently, the launcher may update slot.xmin to a newer value than
1365 * that maintained by the worker. In subsequent cycles, upon integrating
1366 * the worker's oldest_nonremovable_xid, the launcher might detect a
1367 * retreat in the calculated xmin, necessitating additional handling.
1368 *
1369 * XXX To address the above race conditions, we can define
1370 * oldest_nonremovable_xid as FullTransactionId and adds the check to
1371 * disallow retreating the conflict slot's xmin. For now, we kept the
1372 * implementation simple by disallowing change to the retain_dead_tuples,
1373 * but in the future we can change this after some more analysis.
1374 *
1375 * Note that we could restrict only the enabling of retain_dead_tuples to
1376 * avoid the race conditions described above, but we maintain the
1377 * restriction for both enable and disable operations for the sake of
1378 * consistency.
1379 */
1380 if (sub->enabled)
1381 ereport(ERROR,
1383 errmsg("cannot set option \"%s\" for enabled subscription",
1384 option)));
1385
1387 {
1388 StringInfoData cmd;
1389
1390 /*
1391 * A valid slot must be associated with the subscription for us to
1392 * modify any of the slot's properties.
1393 */
1394 if (!sub->slotname)
1395 ereport(ERROR,
1397 errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1398 option)));
1399
1400 /* The changed option of the slot can't be rolled back. */
1401 initStringInfo(&cmd);
1402 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1403
1405 pfree(cmd.data);
1406 }
1407}
1408
1409/*
1410 * Alter the existing subscription.
1411 */
1414 bool isTopLevel)
1415{
1416 Relation rel;
1418 bool nulls[Natts_pg_subscription];
1421 HeapTuple tup;
1422 Oid subid;
1423 bool update_tuple = false;
1424 bool update_failover = false;
1425 bool update_two_phase = false;
1426 bool check_pub_rdt = false;
1427 bool retain_dead_tuples;
1428 int max_retention;
1429 bool retention_active;
1430 char *origin;
1431 Subscription *sub;
1434 SubOpts opts = {0};
1435
1437
1438 /* Fetch the existing tuple. */
1440 CStringGetDatum(stmt->subname));
1441
1442 if (!HeapTupleIsValid(tup))
1443 ereport(ERROR,
1445 errmsg("subscription \"%s\" does not exist",
1446 stmt->subname)));
1447
1449 subid = form->oid;
1450
1451 /* must be owner */
1454 stmt->subname);
1455
1456 /*
1457 * Skip ACL checks on the subscription's foreign server, if any. If
1458 * changing the server (or replacing it with a raw connection), then the
1459 * old one will be removed anyway. If changing something unrelated,
1460 * there's no need to do an additional ACL check here; that will be done
1461 * by the subscription worker anyway.
1462 */
1463 sub = GetSubscription(subid, false, false);
1464
1466 origin = sub->origin;
1469
1470 /*
1471 * Don't allow non-superuser modification of a subscription with
1472 * password_required=false.
1473 */
1474 if (!sub->passwordrequired && !superuser())
1475 ereport(ERROR,
1477 errmsg("password_required=false is superuser-only"),
1478 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1479
1480 /* Lock the subscription so nobody else can do anything with it. */
1482
1483 /* Form a new tuple. */
1484 memset(values, 0, sizeof(values));
1485 memset(nulls, false, sizeof(nulls));
1486 memset(replaces, false, sizeof(replaces));
1487
1489
1490 switch (stmt->kind)
1491 {
1493 {
1504
1505 parse_subscription_options(pstate, stmt->options,
1507
1508 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1509 {
1510 /*
1511 * The subscription must be disabled to allow slot_name as
1512 * 'none', otherwise, the apply worker will repeatedly try
1513 * to stream the data using that slot_name which neither
1514 * exists on the publisher nor the user will be allowed to
1515 * create it.
1516 */
1517 if (sub->enabled && !opts.slot_name)
1518 ereport(ERROR,
1520 errmsg("cannot set %s for enabled subscription",
1521 "slot_name = NONE")));
1522
1523 if (opts.slot_name)
1526 else
1527 nulls[Anum_pg_subscription_subslotname - 1] = true;
1529 }
1530
1531 if (opts.synchronous_commit)
1532 {
1534 CStringGetTextDatum(opts.synchronous_commit);
1536 }
1537
1538 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1539 {
1541 BoolGetDatum(opts.binary);
1543 }
1544
1545 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1546 {
1548 CharGetDatum(opts.streaming);
1550 }
1551
1552 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1553 {
1555 = BoolGetDatum(opts.disableonerr);
1557 = true;
1558 }
1559
1560 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1561 {
1562 /* Non-superuser may not disable password_required. */
1563 if (!opts.passwordrequired && !superuser())
1564 ereport(ERROR,
1566 errmsg("password_required=false is superuser-only"),
1567 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1568
1570 = BoolGetDatum(opts.passwordrequired);
1572 = true;
1573 }
1574
1575 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1576 {
1578 BoolGetDatum(opts.runasowner);
1580 }
1581
1582 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1583 {
1584 /*
1585 * We need to update both the slot and the subscription
1586 * for the two_phase option. We can enable the two_phase
1587 * option for a slot only once the initial data
1588 * synchronization is done. This is to avoid missing some
1589 * data as explained in comments atop worker.c.
1590 */
1591 update_two_phase = !opts.twophase;
1592
1593 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1594 isTopLevel);
1595
1596 /*
1597 * Modifying the two_phase slot option requires a slot
1598 * lookup by slot name, so changing the slot name at the
1599 * same time is not allowed.
1600 */
1601 if (update_two_phase &&
1602 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1603 ereport(ERROR,
1605 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1606
1607 /*
1608 * Note that workers may still survive even if the
1609 * subscription has been disabled.
1610 *
1611 * Ensure workers have already been exited to avoid
1612 * getting prepared transactions while we are disabling
1613 * the two_phase option. Otherwise, the changes of an
1614 * already prepared transaction can be replicated again
1615 * along with its corresponding commit, leading to
1616 * duplicate data or errors.
1617 */
1618 if (logicalrep_workers_find(subid, true, true))
1619 ereport(ERROR,
1621 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1622 errhint("Try again after some time.")));
1623
1624 /*
1625 * two_phase cannot be disabled if there are any
1626 * uncommitted prepared transactions present otherwise it
1627 * can lead to duplicate data or errors as explained in
1628 * the comment above.
1629 */
1630 if (update_two_phase &&
1632 LookupGXactBySubid(subid))
1633 ereport(ERROR,
1635 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1636 errhint("Resolve these transactions and try again.")));
1637
1638 /* Change system catalog accordingly */
1640 CharGetDatum(opts.twophase ?
1644 }
1645
1646 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1647 {
1648 /*
1649 * Similar to the two_phase case above, we need to update
1650 * the failover option for both the slot and the
1651 * subscription.
1652 */
1653 update_failover = true;
1654
1655 CheckAlterSubOption(sub, "failover", update_failover,
1656 isTopLevel);
1657
1659 BoolGetDatum(opts.failover);
1661 }
1662
1663 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1664 {
1666 BoolGetDatum(opts.retaindeadtuples);
1668
1669 /*
1670 * Update the retention status only if there's a change in
1671 * the retain_dead_tuples option value.
1672 *
1673 * Automatically marking retention as active when
1674 * retain_dead_tuples is enabled may not always be ideal,
1675 * especially if retention was previously stopped and the
1676 * user toggles retain_dead_tuples without adjusting the
1677 * publisher workload. However, this behavior provides a
1678 * convenient way for users to manually refresh the
1679 * retention status. Since retention will be stopped again
1680 * unless the publisher workload is reduced, this approach
1681 * is acceptable for now.
1682 */
1683 if (opts.retaindeadtuples != sub->retaindeadtuples)
1684 {
1686 BoolGetDatum(opts.retaindeadtuples);
1688
1689 retention_active = opts.retaindeadtuples;
1690 }
1691
1692 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1693
1694 /*
1695 * Workers may continue running even after the
1696 * subscription has been disabled.
1697 *
1698 * To prevent race conditions (as described in
1699 * CheckAlterSubOption()), ensure that all worker
1700 * processes have already exited before proceeding.
1701 */
1702 if (logicalrep_workers_find(subid, true, true))
1703 ereport(ERROR,
1705 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1706 errhint("Try again after some time.")));
1707
1708 /*
1709 * Notify the launcher to manage the replication slot for
1710 * conflict detection. This ensures that replication slot
1711 * is efficiently handled (created, updated, or dropped)
1712 * in response to any configuration changes.
1713 */
1715
1716 check_pub_rdt = opts.retaindeadtuples;
1717 retain_dead_tuples = opts.retaindeadtuples;
1718 }
1719
1720 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1721 {
1723 Int32GetDatum(opts.maxretention);
1725
1726 max_retention = opts.maxretention;
1727 }
1728
1729 /*
1730 * Ensure that system configuration parameters are set
1731 * appropriately to support retain_dead_tuples and
1732 * max_retention_duration.
1733 */
1734 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1735 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1739 (max_retention > 0));
1740
1741 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1742 {
1744 CStringGetTextDatum(opts.origin);
1746
1747 /*
1748 * Check if changes from different origins may be received
1749 * from the publisher when the origin is changed to ANY
1750 * and retain_dead_tuples is enabled.
1751 */
1754
1755 origin = opts.origin;
1756 }
1757
1758 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1759 {
1761 CStringGetTextDatum(opts.wal_receiver_timeout);
1763 }
1764
1765 update_tuple = true;
1766 break;
1767 }
1768
1770 {
1771 parse_subscription_options(pstate, stmt->options,
1773 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1774
1775 if (!sub->slotname && opts.enabled)
1776 ereport(ERROR,
1778 errmsg("cannot enable subscription that does not have a slot name")));
1779
1780 /*
1781 * Check track_commit_timestamp only when enabling the
1782 * subscription in case it was disabled after creation. See
1783 * comments atop CheckSubDeadTupleRetention() for details.
1784 */
1785 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1787 sub->retentionactive, false);
1788
1790 BoolGetDatum(opts.enabled);
1792
1793 if (opts.enabled)
1795
1796 update_tuple = true;
1797
1798 /*
1799 * The subscription might be initially created with
1800 * connect=false and retain_dead_tuples=true, meaning the
1801 * remote server's status may not be checked. Ensure this
1802 * check is conducted now.
1803 */
1804 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1805 break;
1806 }
1807
1809 {
1813 char *conninfo;
1814
1815 /*
1816 * Remove what was there before, either another foreign server
1817 * or a connection string.
1818 */
1819 if (form->subserver)
1820 {
1823 ForeignServerRelationId, form->subserver);
1824 }
1825 else
1826 {
1827 nulls[Anum_pg_subscription_subconninfo - 1] = true;
1829 }
1830
1831 /*
1832 * Check that the subscription owner has USAGE privileges on
1833 * the server.
1834 */
1835 new_server = GetForeignServerByName(stmt->servername, false);
1837 new_server->serverid,
1838 form->subowner, ACL_USAGE);
1839 if (aclresult != ACLCHECK_OK)
1840 ereport(ERROR,
1842 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
1843 GetUserNameFromId(form->subowner, false),
1844 new_server->servername));
1845
1846 /* make sure a user mapping exists */
1847 GetUserMapping(form->subowner, new_server->serverid);
1848
1849 conninfo = ForeignServerConnectionString(form->subowner,
1850 new_server);
1851
1852 /* Load the library providing us libpq calls. */
1853 load_file("libpqwalreceiver", false);
1854 /* Check the connection info string. */
1855 walrcv_check_conninfo(conninfo,
1856 sub->passwordrequired && !sub->ownersuperuser);
1857
1860
1863
1864 update_tuple = true;
1865 }
1866 break;
1867
1869 /* remove reference to foreign server and dependencies, if present */
1870 if (form->subserver)
1871 {
1874 ForeignServerRelationId, form->subserver);
1875
1878 }
1879
1880 /* Load the library providing us libpq calls. */
1881 load_file("libpqwalreceiver", false);
1882 /* Check the connection info string. */
1883 walrcv_check_conninfo(stmt->conninfo,
1884 sub->passwordrequired && !sub->ownersuperuser);
1885
1887 CStringGetTextDatum(stmt->conninfo);
1889 update_tuple = true;
1890
1891 /*
1892 * Since the remote server configuration might have changed,
1893 * perform a check to ensure it permits enabling
1894 * retain_dead_tuples.
1895 */
1897 break;
1898
1900 {
1902 parse_subscription_options(pstate, stmt->options,
1904
1906 publicationListToArray(stmt->publication);
1908
1909 update_tuple = true;
1910
1911 /* Refresh if user asked us to. */
1912 if (opts.refresh)
1913 {
1914 if (!sub->enabled)
1915 ereport(ERROR,
1917 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1918 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1919
1920 /*
1921 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1922 * why this is not allowed.
1923 */
1924 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1925 ereport(ERROR,
1927 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1928 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1929
1930 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1931
1932 /* Make sure refresh sees the new list of publications. */
1933 sub->publications = stmt->publication;
1934
1935 AlterSubscription_refresh(sub, opts.copy_data,
1936 stmt->publication);
1937 }
1938
1939 break;
1940 }
1941
1944 {
1945 List *publist;
1947
1949 parse_subscription_options(pstate, stmt->options,
1951
1952 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1956
1957 update_tuple = true;
1958
1959 /* Refresh if user asked us to. */
1960 if (opts.refresh)
1961 {
1962 /* We only need to validate user specified publications. */
1963 List *validate_publications = (isadd) ? stmt->publication : NULL;
1964
1965 if (!sub->enabled)
1966 ereport(ERROR,
1968 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1969 /* translator: %s is an SQL ALTER command */
1970 errhint("Use %s instead.",
1971 isadd ?
1972 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1973 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1974
1975 /*
1976 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1977 * why this is not allowed.
1978 */
1979 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1980 ereport(ERROR,
1982 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1983 /* translator: %s is an SQL ALTER command */
1984 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1985 isadd ?
1986 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1987 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1988
1989 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1990
1991 /* Refresh the new list of publications. */
1992 sub->publications = publist;
1993
1994 AlterSubscription_refresh(sub, opts.copy_data,
1996 }
1997
1998 break;
1999 }
2000
2002 {
2003 if (!sub->enabled)
2004 ereport(ERROR,
2006 errmsg("%s is not allowed for disabled subscriptions",
2007 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
2008
2009 parse_subscription_options(pstate, stmt->options,
2011
2012 /*
2013 * The subscription option "two_phase" requires that
2014 * replication has passed the initial table synchronization
2015 * phase before the two_phase becomes properly enabled.
2016 *
2017 * But, having reached this two-phase commit "enabled" state
2018 * we must not allow any subsequent table initialization to
2019 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
2020 * disallowed when the user had requested two_phase = on mode.
2021 *
2022 * The exception to this restriction is when copy_data =
2023 * false, because when copy_data is false the tablesync will
2024 * start already in READY state and will exit directly without
2025 * doing anything.
2026 *
2027 * For more details see comments atop worker.c.
2028 */
2029 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
2030 ereport(ERROR,
2032 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
2033 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
2034
2035 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
2036
2037 AlterSubscription_refresh(sub, opts.copy_data, NULL);
2038
2039 break;
2040 }
2041
2043 {
2044 if (!sub->enabled)
2045 ereport(ERROR,
2047 errmsg("%s is not allowed for disabled subscriptions",
2048 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
2049
2051
2052 break;
2053 }
2054
2056 {
2057 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
2058
2059 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
2060 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
2061
2062 /*
2063 * If the user sets subskiplsn, we do a sanity check to make
2064 * sure that the specified LSN is a probable value.
2065 */
2066 if (XLogRecPtrIsValid(opts.lsn))
2067 {
2069 char originname[NAMEDATALEN];
2070 XLogRecPtr remote_lsn;
2071
2073 originname, sizeof(originname));
2075 remote_lsn = replorigin_get_progress(originid, false);
2076
2077 /* Check the given LSN is at least a future LSN */
2078 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
2079 ereport(ERROR,
2081 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
2082 LSN_FORMAT_ARGS(opts.lsn),
2083 LSN_FORMAT_ARGS(remote_lsn))));
2084 }
2085
2088
2089 update_tuple = true;
2090 break;
2091 }
2092
2093 default:
2094 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
2095 stmt->kind);
2096 }
2097
2098 /* Update the catalog if needed. */
2099 if (update_tuple)
2100 {
2102 replaces);
2103
2104 CatalogTupleUpdate(rel, &tup->t_self, tup);
2105
2107 }
2108
2109 /*
2110 * Try to acquire the connection necessary either for modifying the slot
2111 * or for checking if the remote server permits enabling
2112 * retain_dead_tuples.
2113 *
2114 * This has to be at the end because otherwise if there is an error while
2115 * doing the database operations we won't be able to rollback altered
2116 * slot.
2117 */
2119 {
2120 bool must_use_password;
2121 char *err;
2123
2124 /* Load the library providing us libpq calls. */
2125 load_file("libpqwalreceiver", false);
2126
2127 /*
2128 * Try to connect to the publisher, using the new connection string if
2129 * available.
2130 */
2132 wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
2133 true, true, must_use_password, sub->name,
2134 &err);
2135 if (!wrconn)
2136 ereport(ERROR,
2138 errmsg("subscription \"%s\" could not connect to the publisher: %s",
2139 sub->name, err)));
2140
2141 PG_TRY();
2142 {
2145
2147 retain_dead_tuples, origin, NULL, 0,
2148 sub->name);
2149
2152 update_failover ? &opts.failover : NULL,
2153 update_two_phase ? &opts.twophase : NULL);
2154 }
2155 PG_FINALLY();
2156 {
2158 }
2159 PG_END_TRY();
2160 }
2161
2163
2165
2166 /* Wake up related replication workers to handle this change quickly. */
2168
2169 return myself;
2170}
2171
2172/*
2173 * Drop a subscription
2174 */
2175void
2177{
2178 Relation rel;
2180 HeapTuple tup;
2181 Oid subid;
2182 Oid subowner;
2183 Datum datum;
2184 bool isnull;
2185 char *subname;
2186 char *conninfo;
2187 char *slotname;
2189 ListCell *lc;
2190 char originname[NAMEDATALEN];
2191 char *err = NULL;
2194 List *rstates;
2195 bool must_use_password;
2196
2197 /*
2198 * The launcher may concurrently start a new worker for this subscription.
2199 * During initialization, the worker checks for subscription validity and
2200 * exits if the subscription has already been dropped. See
2201 * InitializeLogRepWorker.
2202 */
2204
2206 CStringGetDatum(stmt->subname));
2207
2208 if (!HeapTupleIsValid(tup))
2209 {
2210 table_close(rel, NoLock);
2211
2212 if (!stmt->missing_ok)
2213 ereport(ERROR,
2215 errmsg("subscription \"%s\" does not exist",
2216 stmt->subname)));
2217 else
2219 (errmsg("subscription \"%s\" does not exist, skipping",
2220 stmt->subname)));
2221
2222 return;
2223 }
2224
2226 subid = form->oid;
2227 subowner = form->subowner;
2228 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2229
2230 /* must be owner */
2233 stmt->subname);
2234
2235 /* DROP hook for the subscription being removed */
2237
2238 /*
2239 * Lock the subscription so nobody else can do anything with it (including
2240 * the replication workers).
2241 */
2243
2244 /* Get subname */
2247 subname = pstrdup(NameStr(*DatumGetName(datum)));
2248
2249 /* Get conninfo */
2250 if (OidIsValid(form->subserver))
2251 {
2253 ForeignServer *server;
2254
2255 server = GetForeignServer(form->subserver);
2257 form->subowner, ACL_USAGE);
2258 if (aclresult != ACLCHECK_OK)
2259 {
2260 /*
2261 * Unable to generate connection string because permissions on the
2262 * foreign server have been removed. Follow the same logic as an
2263 * unusable subconninfo (which will result in an ERROR later
2264 * unless slot_name = NONE).
2265 */
2266 err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
2267 GetUserNameFromId(form->subowner, false),
2268 server->servername);
2269 conninfo = NULL;
2270 }
2271 else
2272 conninfo = ForeignServerConnectionString(form->subowner,
2273 server);
2274 }
2275 else
2276 {
2279 conninfo = TextDatumGetCString(datum);
2280 }
2281
2282 /* Get slotname */
2285 if (!isnull)
2286 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2287 else
2288 slotname = NULL;
2289
2290 /*
2291 * Since dropping a replication slot is not transactional, the replication
2292 * slot stays dropped even if the transaction rolls back. So we cannot
2293 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2294 * replication slot. Also, in this case, we report a message for dropping
2295 * the subscription to the cumulative stats system.
2296 *
2297 * XXX The command name should really be something like "DROP SUBSCRIPTION
2298 * of a subscription that is associated with a replication slot", but we
2299 * don't have the proper facilities for that.
2300 */
2301 if (slotname)
2302 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2303
2306
2307 /* Remove the tuple from catalog. */
2308 CatalogTupleDelete(rel, &tup->t_self);
2309
2311
2312 /*
2313 * Stop all the subscription workers immediately.
2314 *
2315 * This is necessary if we are dropping the replication slot, so that the
2316 * slot becomes accessible.
2317 *
2318 * It is also necessary if the subscription is disabled and was disabled
2319 * in the same transaction. Then the workers haven't seen the disabling
2320 * yet and will still be running, leading to hangs later when we want to
2321 * drop the replication origin. If the subscription was disabled before
2322 * this transaction, then there shouldn't be any workers left, so this
2323 * won't make a difference.
2324 *
2325 * New workers won't be started because we hold an exclusive lock on the
2326 * subscription till the end of the transaction.
2327 */
2328 subworkers = logicalrep_workers_find(subid, false, true);
2329 foreach(lc, subworkers)
2330 {
2332
2334 }
2336
2337 /*
2338 * Remove the no-longer-useful entry in the launcher's table of apply
2339 * worker start times.
2340 *
2341 * If this transaction rolls back, the launcher might restart a failed
2342 * apply worker before wal_retrieve_retry_interval milliseconds have
2343 * elapsed, but that's pretty harmless.
2344 */
2346
2347 /*
2348 * Cleanup of tablesync replication origins.
2349 *
2350 * Any READY-state relations would already have dealt with clean-ups.
2351 *
2352 * Note that the state can't change because we have already stopped both
2353 * the apply and tablesync workers and they can't restart because of
2354 * exclusive lock on the subscription.
2355 */
2356 rstates = GetSubscriptionRelations(subid, true, false, true);
2357 foreach(lc, rstates)
2358 {
2360 Oid relid = rstate->relid;
2361
2362 /* Only cleanup resources of tablesync workers */
2363 if (!OidIsValid(relid))
2364 continue;
2365
2366 /*
2367 * Drop the tablesync's origin tracking if exists.
2368 *
2369 * It is possible that the origin is not yet created for tablesync
2370 * worker so passing missing_ok = true. This can happen for the states
2371 * before SUBREL_STATE_DATASYNC.
2372 */
2374 sizeof(originname));
2375 replorigin_drop_by_name(originname, true, false);
2376 }
2377
2378 /* Clean up dependencies */
2381
2382 /* Remove any associated relation synchronization states. */
2384
2385 /* Remove the origin tracking if exists. */
2387 replorigin_drop_by_name(originname, true, false);
2388
2389 /*
2390 * Tell the cumulative stats system that the subscription is getting
2391 * dropped.
2392 */
2394
2395 /*
2396 * If there is no slot associated with the subscription, we can finish
2397 * here.
2398 */
2399 if (!slotname && rstates == NIL)
2400 {
2401 table_close(rel, NoLock);
2402 return;
2403 }
2404
2405 /*
2406 * Try to acquire the connection necessary for dropping slots.
2407 *
2408 * Note: If the slotname is NONE/NULL then we allow the command to finish
2409 * and users need to manually cleanup the apply and tablesync worker slots
2410 * later.
2411 *
2412 * This has to be at the end because otherwise if there is an error while
2413 * doing the database operations we won't be able to rollback dropped
2414 * slot.
2415 */
2416 load_file("libpqwalreceiver", false);
2417
2418 if (conninfo)
2419 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2420 subname, &err);
2421
2422 if (wrconn == NULL)
2423 {
2424 if (!slotname)
2425 {
2426 /* be tidy */
2428 table_close(rel, NoLock);
2429 return;
2430 }
2431 else
2432 {
2433 ReportSlotConnectionError(rstates, subid, slotname, err);
2434 }
2435 }
2436
2437 PG_TRY();
2438 {
2439 foreach(lc, rstates)
2440 {
2442 Oid relid = rstate->relid;
2443
2444 /* Only cleanup resources of tablesync workers */
2445 if (!OidIsValid(relid))
2446 continue;
2447
2448 /*
2449 * Drop the tablesync slots associated with removed tables.
2450 *
2451 * For SYNCDONE/READY states, the tablesync slot is known to have
2452 * already been dropped by the tablesync worker.
2453 *
2454 * For other states, there is no certainty, maybe the slot does
2455 * not exist yet. Also, if we fail after removing some of the
2456 * slots, next time, it will again try to drop already dropped
2457 * slots and fail. For these reasons, we allow missing_ok = true
2458 * for the drop.
2459 */
2460 if (rstate->state != SUBREL_STATE_SYNCDONE)
2461 {
2462 char syncslotname[NAMEDATALEN] = {0};
2463
2465 sizeof(syncslotname));
2467 }
2468 }
2469
2471
2472 /*
2473 * If there is a slot associated with the subscription, then drop the
2474 * replication slot at the publisher.
2475 */
2476 if (slotname)
2477 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2478 }
2479 PG_FINALLY();
2480 {
2482 }
2483 PG_END_TRY();
2484
2485 table_close(rel, NoLock);
2486}
2487
2488/*
2489 * Drop the replication slot at the publisher node using the replication
2490 * connection.
2491 *
2492 * missing_ok - if true then only issue a LOG message if the slot doesn't
2493 * exist.
2494 */
2495void
2496ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2497{
2498 StringInfoData cmd;
2499
2500 Assert(wrconn);
2501
2502 load_file("libpqwalreceiver", false);
2503
2504 initStringInfo(&cmd);
2505 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2506
2507 PG_TRY();
2508 {
2509 WalRcvExecResult *res;
2510
2511 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2512
2513 if (res->status == WALRCV_OK_COMMAND)
2514 {
2515 /* NOTICE. Success. */
2517 (errmsg("dropped replication slot \"%s\" on publisher",
2518 slotname)));
2519 }
2520 else if (res->status == WALRCV_ERROR &&
2521 missing_ok &&
2523 {
2524 /* LOG. Error, but missing_ok = true. */
2525 ereport(LOG,
2526 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2527 slotname, res->err)));
2528 }
2529 else
2530 {
2531 /* ERROR. */
2532 ereport(ERROR,
2534 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2535 slotname, res->err)));
2536 }
2537
2539 }
2540 PG_FINALLY();
2541 {
2542 pfree(cmd.data);
2543 }
2544 PG_END_TRY();
2545}
2546
2547/*
2548 * Internal workhorse for changing a subscription owner
2549 */
2550static void
2552{
2555
2557
2558 if (form->subowner == newOwnerId)
2559 return;
2560
2563 NameStr(form->subname));
2564
2565 /*
2566 * Don't allow non-superuser modification of a subscription with
2567 * password_required=false.
2568 */
2569 if (!form->subpasswordrequired && !superuser())
2570 ereport(ERROR,
2572 errmsg("password_required=false is superuser-only"),
2573 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2574
2575 /* Must be able to become new owner */
2577
2578 /*
2579 * current owner must have CREATE on database
2580 *
2581 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2582 * other object types behave differently (e.g. you can't give a table to a
2583 * user who lacks CREATE privileges on a schema).
2584 */
2587 if (aclresult != ACLCHECK_OK)
2590
2591 /*
2592 * If the subscription uses a server, check that the new owner has USAGE
2593 * privileges on the server and that a user mapping exists. Note: does not
2594 * re-check the resulting connection string.
2595 */
2596 if (OidIsValid(form->subserver))
2597 {
2598 ForeignServer *server = GetForeignServer(form->subserver);
2599
2601 if (aclresult != ACLCHECK_OK)
2602 ereport(ERROR,
2604 errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
2606 server->servername));
2607
2608 /* make sure a user mapping exists */
2610 }
2611
2612 form->subowner = newOwnerId;
2613 CatalogTupleUpdate(rel, &tup->t_self, tup);
2614
2615 /* Update owner dependency reference */
2617 form->oid,
2618 newOwnerId);
2619
2621 form->oid, 0);
2622
2623 /* Wake up related background processes to handle this change quickly. */
2626}
2627
2628/*
2629 * Change subscription owner -- by name
2630 */
2633{
2634 Oid subid;
2635 HeapTuple tup;
2636 Relation rel;
2637 ObjectAddress address;
2639
2641
2644
2645 if (!HeapTupleIsValid(tup))
2646 ereport(ERROR,
2648 errmsg("subscription \"%s\" does not exist", name)));
2649
2651 subid = form->oid;
2652
2654
2656
2658
2660
2661 return address;
2662}
2663
2664/*
2665 * Change subscription owner -- by OID
2666 */
2667void
2669{
2670 HeapTuple tup;
2671 Relation rel;
2672
2674
2676
2677 if (!HeapTupleIsValid(tup))
2678 ereport(ERROR,
2680 errmsg("subscription with OID %u does not exist", subid)));
2681
2683
2685
2687}
2688
2689/*
2690 * Check and log a warning if the publisher has subscribed to the same table,
2691 * its partition ancestors (if it's a partition), or its partition children (if
2692 * it's a partitioned table), from some other publishers. This check is
2693 * required in the following scenarios:
2694 *
2695 * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2696 * statements with "copy_data = true" and "origin = none":
2697 * - Warn the user that data with an origin might have been copied.
2698 * - This check is skipped for tables already added, as incremental sync via
2699 * WAL allows origin tracking. The list of such tables is in
2700 * subrel_local_oids.
2701 *
2702 * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2703 * statements with "retain_dead_tuples = true" and "origin = any", and for
2704 * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
2705 * or when the publisher's status changes (e.g., due to a connection string
2706 * update):
2707 * - Warn the user that only conflict detection info for local changes on
2708 * the publisher is retained. Data from other origins may lack sufficient
2709 * details for reliable conflict detection.
2710 * - See comments atop worker.c for more details.
2711 */
2712static void
2714 bool copydata, bool retain_dead_tuples,
2715 char *origin, Oid *subrel_local_oids,
2716 int subrel_count, char *subname)
2717{
2718 WalRcvExecResult *res;
2719 StringInfoData cmd;
2720 TupleTableSlot *slot;
2721 Oid tableRow[1] = {TEXTOID};
2722 List *publist = NIL;
2723 int i;
2724 bool check_rdt;
2725 bool check_table_sync;
2726 bool origin_none = origin &&
2728
2729 /*
2730 * Enable retain_dead_tuples checks only when origin is set to 'any',
2731 * since with origin='none' only local changes are replicated to the
2732 * subscriber.
2733 */
2735
2736 /*
2737 * Enable table synchronization checks only when origin is 'none', to
2738 * ensure that data from other origins is not inadvertently copied.
2739 */
2741
2742 /* retain_dead_tuples and table sync checks occur separately */
2744
2745 /* Return if no checks are required */
2746 if (!check_rdt && !check_table_sync)
2747 return;
2748
2749 initStringInfo(&cmd);
2751 "SELECT DISTINCT P.pubname AS pubname\n"
2752 "FROM pg_publication P,\n"
2753 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2754 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2755 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2756 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2757 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2758 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2759 GetPublicationsStr(publications, &cmd, true);
2760 appendStringInfoString(&cmd, ")\n");
2761
2762 /*
2763 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2764 * subrel_local_oids contains the list of relation oids that are already
2765 * present on the subscriber. This check should be skipped for these
2766 * tables if checking for table sync scenario. However, when handling the
2767 * retain_dead_tuples scenario, ensure all tables are checked, as some
2768 * existing tables may now include changes from other origins due to newly
2769 * created subscriptions on the publisher.
2770 */
2771 if (check_table_sync)
2772 {
2773 for (i = 0; i < subrel_count; i++)
2774 {
2775 Oid relid = subrel_local_oids[i];
2776 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2777 char *tablename = get_rel_name(relid);
2778
2779 appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2780 schemaname, tablename);
2781 }
2782 }
2783
2784 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2785 pfree(cmd.data);
2786
2787 if (res->status != WALRCV_OK_TUPLES)
2788 ereport(ERROR,
2790 errmsg("could not receive list of replicated tables from the publisher: %s",
2791 res->err)));
2792
2793 /* Process publications. */
2795 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2796 {
2797 char *pubname;
2798 bool isnull;
2799
2800 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2801 Assert(!isnull);
2802
2803 ExecClearTuple(slot);
2805 }
2806
2807 /*
2808 * Log a warning if the publisher has subscribed to the same table from
2809 * some other publisher. We cannot know the origin of data during the
2810 * initial sync. Data origins can be found only from the WAL by looking at
2811 * the origin id.
2812 *
2813 * XXX: For simplicity, we don't check whether the table has any data or
2814 * not. If the table doesn't have any data then we don't need to
2815 * distinguish between data having origin and data not having origin so we
2816 * can avoid logging a warning for table sync scenario.
2817 */
2818 if (publist)
2819 {
2821
2822 /* Prepare the list of publication(s) for warning message. */
2825
2826 if (check_table_sync)
2829 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2830 subname),
2831 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2832 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2834 errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2835 else
2838 errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2839 subname),
2840 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2841 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2843 errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
2844 }
2845
2847
2849}
2850
2851/*
2852 * This function is similar to check_publications_origin_tables and serves
2853 * same purpose for sequences.
2854 */
2855static void
2857 bool copydata, char *origin,
2859 char *subname)
2860{
2861 WalRcvExecResult *res;
2862 StringInfoData cmd;
2863 TupleTableSlot *slot;
2864 Oid tableRow[1] = {TEXTOID};
2865 List *publist = NIL;
2866
2867 /*
2868 * Enable sequence synchronization checks only when origin is 'none' , to
2869 * ensure that sequence data from other origins is not inadvertently
2870 * copied. This check is necessary if the publisher is running PG19 or
2871 * later, where logical replication sequence synchronization is supported.
2872 */
2873 if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
2874 walrcv_server_version(wrconn) < 190000)
2875 return;
2876
2877 initStringInfo(&cmd);
2879 "SELECT DISTINCT P.pubname AS pubname\n"
2880 "FROM pg_publication P,\n"
2881 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2882 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2883 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2884 "WHERE C.oid = GPS.relid AND P.pubname IN (");
2885
2886 GetPublicationsStr(publications, &cmd, true);
2887 appendStringInfoString(&cmd, ")\n");
2888
2889 /*
2890 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2891 * subrel_local_oids contains the list of relations that are already
2892 * present on the subscriber. This check should be skipped as these will
2893 * not be re-synced.
2894 */
2895 for (int i = 0; i < subrel_count; i++)
2896 {
2897 Oid relid = subrel_local_oids[i];
2898 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2899 char *seqname = get_rel_name(relid);
2900
2901 appendStringInfo(&cmd,
2902 "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2903 schemaname, seqname);
2904 }
2905
2906 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2907 pfree(cmd.data);
2908
2909 if (res->status != WALRCV_OK_TUPLES)
2910 ereport(ERROR,
2912 errmsg("could not receive list of replicated sequences from the publisher: %s",
2913 res->err)));
2914
2915 /* Process publications. */
2917 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2918 {
2919 char *pubname;
2920 bool isnull;
2921
2922 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2923 Assert(!isnull);
2924
2925 ExecClearTuple(slot);
2927 }
2928
2929 /*
2930 * Log a warning if the publisher has subscribed to the same sequence from
2931 * some other publisher. We cannot know the origin of sequences data
2932 * during the initial sync.
2933 */
2934 if (publist)
2935 {
2937
2938 /* Prepare the list of publication(s) for warning message. */
2941
2944 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2945 subname),
2946 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
2947 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
2949 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
2950 }
2951
2953
2955}
2956
2957/*
2958 * Determine whether the retain_dead_tuples can be enabled based on the
2959 * publisher's status.
2960 *
2961 * This option is disallowed if the publisher is running a version earlier
2962 * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2963 * server).
2964 *
2965 * See comments atop worker.c for a detailed explanation.
2966 */
2967static void
2969{
2970 WalRcvExecResult *res;
2971 Oid RecoveryRow[1] = {BOOLOID};
2972 TupleTableSlot *slot;
2973 bool isnull;
2974 bool remote_in_recovery;
2975
2976 if (walrcv_server_version(wrconn) < 190000)
2977 ereport(ERROR,
2979 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2980
2981 res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2982
2983 if (res->status != WALRCV_OK_TUPLES)
2984 ereport(ERROR,
2986 errmsg("could not obtain recovery progress from the publisher: %s",
2987 res->err)));
2988
2990 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2991 elog(ERROR, "failed to fetch tuple for the recovery progress");
2992
2993 remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2994
2996 ereport(ERROR,
2998 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery"));
2999
3001
3003}
3004
3005/*
3006 * Check if the subscriber's configuration is adequate to enable the
3007 * retain_dead_tuples option.
3008 *
3009 * Issue an ERROR if the wal_level does not support the use of replication
3010 * slots when check_guc is set to true.
3011 *
3012 * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
3013 * set to true. This is only to highlight the importance of enabling
3014 * track_commit_timestamp instead of catching all the misconfigurations, as
3015 * this setting can be adjusted after subscription creation. Without it, the
3016 * apply worker will simply skip conflict detection.
3017 *
3018 * Issue a WARNING or NOTICE if the subscription is disabled and the retention
3019 * is active. Do not raise an ERROR since users can only modify
3020 * retain_dead_tuples for disabled subscriptions. And as long as the
3021 * subscription is enabled promptly, it will not pose issues.
3022 *
3023 * Issue a NOTICE to inform users that max_retention_duration is
3024 * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
3025 * is not issued because setting max_retention_duration causes no harm,
3026 * even when it is ineffective.
3027 */
3028void
3032 bool max_retention_set)
3033{
3036
3038 {
3040 ereport(ERROR,
3042 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
3043 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
3044
3048 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
3049 errhint("Consider setting \"%s\" to true.",
3050 "track_commit_timestamp"));
3051
3055 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
3057 ? errhint("Consider setting %s to false.",
3058 "retain_dead_tuples") : 0);
3059 }
3060 else if (max_retention_set)
3061 {
3064 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
3065 }
3066}
3067
3068/*
3069 * Return true iff 'rv' is a member of the list.
3070 */
3071static bool
3073{
3075 {
3076 if (equal(relinfo->rv, rv))
3077 return true;
3078 }
3079
3080 return false;
3081}
3082
3083/*
3084 * Get the list of tables and sequences which belong to specified publications
3085 * on the publisher connection.
3086 *
3087 * Note that we don't support the case where the column list is different for
3088 * the same table in different publications to avoid sending unwanted column
3089 * information for some of the rows. This can happen when both the column
3090 * list and row filter are specified for different publications.
3091 */
3092static List *
3094{
3095 WalRcvExecResult *res;
3096 StringInfoData cmd;
3097 TupleTableSlot *slot;
3101 bool check_columnlist = (server_version >= 150000);
3102 int column_count = check_columnlist ? 4 : 3;
3103 StringInfoData pub_names;
3104
3105 initStringInfo(&cmd);
3106 initStringInfo(&pub_names);
3107
3108 /* Build the pub_names comma-separated string. */
3109 GetPublicationsStr(publications, &pub_names, true);
3110
3111 /* Get the list of relations from the publisher */
3112 if (server_version >= 160000)
3113 {
3115
3116 /*
3117 * From version 16, we allowed passing multiple publications to the
3118 * function pg_get_publication_tables. This helped to filter out the
3119 * partition table whose ancestor is also published in this
3120 * publication array.
3121 *
3122 * Join pg_get_publication_tables with pg_publication to exclude
3123 * non-existing publications.
3124 *
3125 * Note that attrs are always stored in sorted order so we don't need
3126 * to worry if different publications have specified them in a
3127 * different order. See pub_collist_validate.
3128 */
3129 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
3130 " FROM pg_class c\n"
3131 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
3132 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
3133 " FROM pg_publication\n"
3134 " WHERE pubname IN ( %s )) AS gpt\n"
3135 " ON gpt.relid = c.oid\n",
3136 pub_names.data);
3137
3138 /* From version 19, inclusion of sequences in the target is supported */
3139 if (server_version >= 190000)
3140 appendStringInfo(&cmd,
3141 "UNION ALL\n"
3142 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
3143 " FROM pg_catalog.pg_publication_sequences s\n"
3144 " WHERE s.pubname IN ( %s )",
3145 pub_names.data);
3146 }
3147 else
3148 {
3150 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
3151
3152 /* Get column lists for each relation if the publisher supports it */
3153 if (check_columnlist)
3154 appendStringInfoString(&cmd, ", t.attnames\n");
3155
3156 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
3157 " WHERE t.pubname IN ( %s )",
3158 pub_names.data);
3159 }
3160
3161 pfree(pub_names.data);
3162
3164 pfree(cmd.data);
3165
3166 if (res->status != WALRCV_OK_TUPLES)
3167 ereport(ERROR,
3169 errmsg("could not receive list of replicated tables from the publisher: %s",
3170 res->err)));
3171
3172 /* Process tables. */
3174 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3175 {
3176 char *nspname;
3177 char *relname;
3178 bool isnull;
3179 char relkind;
3181
3182 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3183 Assert(!isnull);
3184 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3185 Assert(!isnull);
3186 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
3187 Assert(!isnull);
3188
3189 relinfo->rv = makeRangeVar(nspname, relname, -1);
3190 relinfo->relkind = relkind;
3191
3192 if (relkind != RELKIND_SEQUENCE &&
3195 ereport(ERROR,
3197 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
3198 nspname, relname));
3199 else
3201
3202 ExecClearTuple(slot);
3203 }
3205
3207
3208 return relationlist;
3209}
3210
3211/*
3212 * This is to report the connection failure while dropping replication slots.
3213 * Here, we report the WARNING for all tablesync slots so that user can drop
3214 * them manually, if required.
3215 */
3216static void
3217ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
3218{
3219 ListCell *lc;
3220
3221 foreach(lc, rstates)
3222 {
3224 Oid relid = rstate->relid;
3225
3226 /* Only cleanup resources of tablesync workers */
3227 if (!OidIsValid(relid))
3228 continue;
3229
3230 /*
3231 * Caller needs to ensure that relstate doesn't change underneath us.
3232 * See DropSubscription where we get the relstates.
3233 */
3234 if (rstate->state != SUBREL_STATE_SYNCDONE)
3235 {
3236 char syncslotname[NAMEDATALEN] = {0};
3237
3239 sizeof(syncslotname));
3240 elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3241 syncslotname);
3242 }
3243 }
3244
3245 ereport(ERROR,
3247 errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3248 slotname, err),
3249 /* translator: %s is an SQL ALTER command */
3250 errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3251 "ALTER SUBSCRIPTION ... DISABLE",
3252 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3253}
3254
3255/*
3256 * Check for duplicates in the given list of publications and error out if
3257 * found one. Add publications to datums as text datums, if datums is not
3258 * NULL.
3259 */
3260static void
3262{
3263 ListCell *cell;
3264 int j = 0;
3265
3266 foreach(cell, publist)
3267 {
3268 char *name = strVal(lfirst(cell));
3269 ListCell *pcell;
3270
3271 foreach(pcell, publist)
3272 {
3273 char *pname = strVal(lfirst(pcell));
3274
3275 if (pcell == cell)
3276 break;
3277
3278 if (strcmp(name, pname) == 0)
3279 ereport(ERROR,
3281 errmsg("publication name \"%s\" used more than once",
3282 pname)));
3283 }
3284
3285 if (datums)
3286 datums[j++] = CStringGetTextDatum(name);
3287 }
3288}
3289
3290/*
3291 * Merge current subscription's publications and user-specified publications
3292 * from ADD/DROP PUBLICATIONS.
3293 *
3294 * If addpub is true, we will add the list of publications into oldpublist.
3295 * Otherwise, we will delete the list of publications from oldpublist. The
3296 * returned list is a copy, oldpublist itself is not changed.
3297 *
3298 * subname is the subscription name, for error messages.
3299 */
3300static List *
3302{
3303 ListCell *lc;
3304
3306
3308
3309 foreach(lc, newpublist)
3310 {
3311 char *name = strVal(lfirst(lc));
3312 ListCell *lc2;
3313 bool found = false;
3314
3315 foreach(lc2, oldpublist)
3316 {
3317 char *pubname = strVal(lfirst(lc2));
3318
3319 if (strcmp(name, pubname) == 0)
3320 {
3321 found = true;
3322 if (addpub)
3323 ereport(ERROR,
3325 errmsg("publication \"%s\" is already in subscription \"%s\"",
3326 name, subname)));
3327 else
3329
3330 break;
3331 }
3332 }
3333
3334 if (addpub && !found)
3336 else if (!addpub && !found)
3337 ereport(ERROR,
3339 errmsg("publication \"%s\" is not in subscription \"%s\"",
3340 name, subname)));
3341 }
3342
3343 /*
3344 * XXX Probably no strong reason for this, but for now it's to make ALTER
3345 * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3346 */
3347 if (!oldpublist)
3348 ereport(ERROR,
3350 errmsg("cannot drop all the publications from a subscription")));
3351
3352 return oldpublist;
3353}
3354
3355/*
3356 * Extract the streaming mode value from a DefElem. This is like
3357 * defGetBoolean() but also accepts the special value of "parallel".
3358 */
3359char
3361{
3362 /*
3363 * If no parameter value given, assume "true" is meant.
3364 */
3365 if (!def->arg)
3366 return LOGICALREP_STREAM_ON;
3367
3368 /*
3369 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3370 */
3371 switch (nodeTag(def->arg))
3372 {
3373 case T_Integer:
3374 switch (intVal(def->arg))
3375 {
3376 case 0:
3377 return LOGICALREP_STREAM_OFF;
3378 case 1:
3379 return LOGICALREP_STREAM_ON;
3380 default:
3381 /* otherwise, error out below */
3382 break;
3383 }
3384 break;
3385 default:
3386 {
3387 char *sval = defGetString(def);
3388
3389 /*
3390 * The set of strings accepted here should match up with the
3391 * grammar's opt_boolean_or_string production.
3392 */
3393 if (pg_strcasecmp(sval, "false") == 0 ||
3394 pg_strcasecmp(sval, "off") == 0)
3395 return LOGICALREP_STREAM_OFF;
3396 if (pg_strcasecmp(sval, "true") == 0 ||
3397 pg_strcasecmp(sval, "on") == 0)
3398 return LOGICALREP_STREAM_ON;
3399 if (pg_strcasecmp(sval, "parallel") == 0)
3401 }
3402 break;
3403 }
3404
3405 ereport(ERROR,
3407 errmsg("%s requires a Boolean value or \"parallel\"",
3408 def->defname)));
3409 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3410}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
void check_can_set_role(Oid member, Oid role)
Definition acl.c:5371
AclResult
Definition acl.h:183
@ ACLCHECK_OK
Definition acl.h:184
@ ACLCHECK_NOT_OWNER
Definition acl.h:186
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2672
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3879
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4133
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6318
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:648
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:835
#define Assert(condition)
Definition c.h:943
#define CppAsString2(x)
Definition c.h:506
int32_t int32
Definition c.h:620
uint32_t uint32
Definition c.h:624
#define OidIsValid(objectId)
Definition c.h:858
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
bool track_commit_timestamp
Definition commit_ts.c:121
int32 defGetInt32(DefElem *def)
Definition define.c:148
char * defGetString(DefElem *def)
Definition define.c:34
bool defGetBoolean(DefElem *def)
Definition define.c:93
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition define.c:370
@ DEPENDENCY_NORMAL
Definition dependency.h:33
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode(int sqlerrcode)
Definition elog.c:874
#define _(x)
Definition elog.c:95
#define LOG
Definition elog.h:32
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define WARNING
Definition elog.h:37
int int int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
#define PG_END_TRY(...)
Definition elog.h:399
#define DEBUG1
Definition elog.h:31
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define NOTICE
Definition elog.h:36
#define PG_FINALLY(...)
Definition elog.h:391
#define ereport(elevel,...)
Definition elog.h:152
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition err.c:43
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
#define palloc_object(type)
Definition fe_memutils.h:74
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:684
ForeignServer * GetForeignServerByName(const char *srvname, bool missing_ok)
Definition foreign.c:185
char * ForeignServerConnectionString(Oid userid, ForeignServer *server)
Definition foreign.c:202
UserMapping * GetUserMapping(Oid userid, Oid serverid)
Definition foreign.c:232
ForeignServer * GetForeignServer(Oid serverid)
Definition foreign.c:114
Oid MyDatabaseId
Definition globals.c:96
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2775
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition guc.c:3248
@ GUC_ACTION_SET
Definition guc.h:203
@ PGC_S_TEST
Definition guc.h:125
@ PGC_BACKEND
Definition guc.h:77
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1118
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1372
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
long val
Definition informix.c:689
int j
Definition isn.c:78
int i
Definition isn.c:77
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:303
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1185
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:662
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1155
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_delete(List *list, void *datum)
Definition list.c:853
List * list_append_unique(List *list, void *datum)
Definition list.c:1343
List * list_copy(const List *oldlist)
Definition list.c:1573
void list_free(List *list)
Definition list.c:1546
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_rel_name(Oid relid)
Definition lsyscache.c:2148
char * get_database_name(Oid dbid)
Definition lsyscache.c:1312
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2223
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2172
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3588
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition makefuncs.c:473
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
Oid GetUserId(void)
Definition miscinit.c:470
char * GetUserNameFromId(Oid roleid, bool noerr)
Definition miscinit.c:990
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
#define nodeTag(nodeptr)
Definition nodes.h:139
static char * errmsg
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
Definition oid.c:287
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:274
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:243
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1057
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:459
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SERVER
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
#define ACL_USAGE
Definition parsenodes.h:84
@ OBJECT_DATABASE
@ OBJECT_FOREIGN_SERVER
@ OBJECT_SUBSCRIPTION
#define ACL_CREATE
Definition parsenodes.h:85
static AmcheckOptions opts
Definition pg_amcheck.c:112
NameData relname
Definition pg_class.h:40
#define NAMEDATALEN
void recordDependencyOn(const ObjectAddress *depender, const ObjectAddress *referenced, DependencyType behavior)
Definition pg_depend.c:47
long deleteDependencyRecordsFor(Oid classId, Oid objectId, bool skipExtensionDeps)
Definition pg_depend.c:303
long deleteDependencyRecordsForSpecific(Oid classId, Oid objectId, char deptype, Oid refclassId, Oid refobjectId)
Definition pg_depend.c:400
static int server_version
Definition pg_dumpall.c:122
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:423
#define foreach_ptr(type, var, lst)
Definition pg_list.h:501
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition pg_lsn.c:64
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static XLogRecPtr DatumGetLSN(Datum X)
Definition pg_lsn.h:25
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
Subscription * GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
NameData subname
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
Definition port.h:495
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static Datum PointerGetDatum(const void *X)
Definition postgres.h:342
static Name DatumGetName(Datum X)
Definition postgres.h:380
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
uint64_t Datum
Definition postgres.h:70
static char DatumGetChar(Datum X)
Definition postgres.h:122
static Datum CStringGetDatum(const char *X)
Definition postgres.h:370
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
#define RelationGetDescr(relation)
Definition rel.h:542
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:265
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
char * defname
Definition parsenodes.h:860
Node * arg
Definition parsenodes.h:861
char * servername
Definition foreign.h:40
Definition pg_list.h:54
LogicalRepWorkerType type
char * relname
Definition primnodes.h:84
char * schemaname
Definition primnodes.h:81
uint32 specified_opts
bool retaindeadtuples
char * wal_receiver_timeout
char * synchronous_commit
int32 maxretention
char * slot_name
XLogRecPtr lsn
bool passwordrequired
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_STREAMING
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
static void parse_subscription_options(ParseState *pstate, List *stmt_options, uint32 supported_opts, SubOpts *opts)
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static bool list_member_rangevar(const List *list, RangeVar *rv)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
#define SUBOPT_CONNECT
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
Definition superuser.c:57
bool superuser(void)
Definition superuser.c:47
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:265
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:231
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:626
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:596
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1235
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1680
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:417
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:476
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2803
String * makeString(char *str)
Definition value.c:63
#define intVal(v)
Definition value.h:79
#define strVal(v)
Definition value.h:82
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:95
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR
@ WALRCV_OK_TUPLES
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23
@ WORKERTYPE_TABLESYNC
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3698
int wal_level
Definition xlog.c:138
@ WAL_LEVEL_REPLICA
Definition xlog.h:77
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28