PostgreSQL Source Code git master
Loading...
Searching...
No Matches
subscriptioncmds.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * subscriptioncmds.c
4 * subscription catalog manipulation functions
5 *
6 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/commands/subscriptioncmds.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres.h"
16
17#include "access/commit_ts.h"
18#include "access/htup_details.h"
19#include "access/table.h"
20#include "access/twophase.h"
21#include "access/xact.h"
22#include "catalog/catalog.h"
23#include "catalog/dependency.h"
24#include "catalog/indexing.h"
25#include "catalog/namespace.h"
28#include "catalog/pg_authid_d.h"
29#include "catalog/pg_database_d.h"
32#include "catalog/pg_type.h"
33#include "commands/defrem.h"
36#include "executor/executor.h"
37#include "miscadmin.h"
38#include "nodes/makefuncs.h"
39#include "pgstat.h"
42#include "replication/origin.h"
43#include "replication/slot.h"
47#include "storage/lmgr.h"
48#include "utils/acl.h"
49#include "utils/builtins.h"
50#include "utils/guc.h"
51#include "utils/lsyscache.h"
52#include "utils/memutils.h"
53#include "utils/pg_lsn.h"
54#include "utils/syscache.h"
55
56/*
57 * Options that can be specified by the user in CREATE/ALTER SUBSCRIPTION
58 * command.
59 */
60#define SUBOPT_CONNECT 0x00000001
61#define SUBOPT_ENABLED 0x00000002
62#define SUBOPT_CREATE_SLOT 0x00000004
63#define SUBOPT_SLOT_NAME 0x00000008
64#define SUBOPT_COPY_DATA 0x00000010
65#define SUBOPT_SYNCHRONOUS_COMMIT 0x00000020
66#define SUBOPT_REFRESH 0x00000040
67#define SUBOPT_BINARY 0x00000080
68#define SUBOPT_STREAMING 0x00000100
69#define SUBOPT_TWOPHASE_COMMIT 0x00000200
70#define SUBOPT_DISABLE_ON_ERR 0x00000400
71#define SUBOPT_PASSWORD_REQUIRED 0x00000800
72#define SUBOPT_RUN_AS_OWNER 0x00001000
73#define SUBOPT_FAILOVER 0x00002000
74#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
75#define SUBOPT_MAX_RETENTION_DURATION 0x00008000
76#define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000
77#define SUBOPT_LSN 0x00020000
78#define SUBOPT_ORIGIN 0x00040000
79
80/* check if the 'val' has 'bits' set */
81#define IsSet(val, bits) (((val) & (bits)) == (bits))
82
83/*
84 * Structure to hold a bitmap representing the user-provided CREATE/ALTER
85 * SUBSCRIPTION command options and the parsed/default values of each of them.
86 */
110
111/*
112 * PublicationRelKind represents a relation included in a publication.
113 * It stores the schema-qualified relation name (rv) and its kind (relkind).
114 */
120
121static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
123 List *publications, bool copydata,
125 char *origin,
127 int subrel_count, char *subname);
129 List *publications,
130 bool copydata, char *origin,
132 int subrel_count,
133 char *subname);
135static void check_duplicates_in_publist(List *publist, Datum *datums);
136static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
137static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
138static void CheckAlterSubOption(Subscription *sub, const char *option,
139 bool slot_needs_update, bool isTopLevel);
140
141
142/*
143 * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
144 *
145 * Since not all options can be specified in both commands, this function
146 * will report an error if mutually exclusive options are specified.
147 */
148static void
151{
152 ListCell *lc;
153
154 /* Start out with cleared opts. */
155 memset(opts, 0, sizeof(SubOpts));
156
157 /* caller must expect some option */
159
160 /* If connect option is supported, these others also need to be. */
164
165 /* Set default values for the supported options. */
167 opts->connect = true;
169 opts->enabled = true;
171 opts->create_slot = true;
173 opts->copy_data = true;
175 opts->refresh = true;
177 opts->binary = false;
179 opts->streaming = LOGICALREP_STREAM_PARALLEL;
181 opts->twophase = false;
183 opts->disableonerr = false;
185 opts->passwordrequired = true;
187 opts->runasowner = false;
189 opts->failover = false;
191 opts->retaindeadtuples = false;
193 opts->maxretention = 0;
196
197 /* Parse options */
198 foreach(lc, stmt_options)
199 {
200 DefElem *defel = (DefElem *) lfirst(lc);
201
203 strcmp(defel->defname, "connect") == 0)
204 {
205 if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
207
208 opts->specified_opts |= SUBOPT_CONNECT;
209 opts->connect = defGetBoolean(defel);
210 }
212 strcmp(defel->defname, "enabled") == 0)
213 {
214 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
216
217 opts->specified_opts |= SUBOPT_ENABLED;
218 opts->enabled = defGetBoolean(defel);
219 }
221 strcmp(defel->defname, "create_slot") == 0)
222 {
223 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
225
226 opts->specified_opts |= SUBOPT_CREATE_SLOT;
227 opts->create_slot = defGetBoolean(defel);
228 }
230 strcmp(defel->defname, "slot_name") == 0)
231 {
232 if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
234
235 opts->specified_opts |= SUBOPT_SLOT_NAME;
236 opts->slot_name = defGetString(defel);
237
238 /* Setting slot_name = NONE is treated as no slot name. */
239 if (strcmp(opts->slot_name, "none") == 0)
240 opts->slot_name = NULL;
241 else
242 ReplicationSlotValidateName(opts->slot_name, false, ERROR);
243 }
245 strcmp(defel->defname, "copy_data") == 0)
246 {
247 if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
249
250 opts->specified_opts |= SUBOPT_COPY_DATA;
251 opts->copy_data = defGetBoolean(defel);
252 }
254 strcmp(defel->defname, "synchronous_commit") == 0)
255 {
256 if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
258
259 opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
260 opts->synchronous_commit = defGetString(defel);
261
262 /* Test if the given value is valid for synchronous_commit GUC. */
263 (void) set_config_option("synchronous_commit", opts->synchronous_commit,
265 false, 0, false);
266 }
268 strcmp(defel->defname, "refresh") == 0)
269 {
270 if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
272
273 opts->specified_opts |= SUBOPT_REFRESH;
274 opts->refresh = defGetBoolean(defel);
275 }
277 strcmp(defel->defname, "binary") == 0)
278 {
279 if (IsSet(opts->specified_opts, SUBOPT_BINARY))
281
282 opts->specified_opts |= SUBOPT_BINARY;
283 opts->binary = defGetBoolean(defel);
284 }
286 strcmp(defel->defname, "streaming") == 0)
287 {
288 if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
290
291 opts->specified_opts |= SUBOPT_STREAMING;
292 opts->streaming = defGetStreamingMode(defel);
293 }
295 strcmp(defel->defname, "two_phase") == 0)
296 {
297 if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
299
300 opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
301 opts->twophase = defGetBoolean(defel);
302 }
304 strcmp(defel->defname, "disable_on_error") == 0)
305 {
306 if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
308
309 opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
310 opts->disableonerr = defGetBoolean(defel);
311 }
313 strcmp(defel->defname, "password_required") == 0)
314 {
315 if (IsSet(opts->specified_opts, SUBOPT_PASSWORD_REQUIRED))
317
318 opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
319 opts->passwordrequired = defGetBoolean(defel);
320 }
322 strcmp(defel->defname, "run_as_owner") == 0)
323 {
324 if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
326
327 opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
328 opts->runasowner = defGetBoolean(defel);
329 }
331 strcmp(defel->defname, "failover") == 0)
332 {
333 if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
335
336 opts->specified_opts |= SUBOPT_FAILOVER;
337 opts->failover = defGetBoolean(defel);
338 }
340 strcmp(defel->defname, "retain_dead_tuples") == 0)
341 {
342 if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
344
345 opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
346 opts->retaindeadtuples = defGetBoolean(defel);
347 }
349 strcmp(defel->defname, "max_retention_duration") == 0)
350 {
351 if (IsSet(opts->specified_opts, SUBOPT_MAX_RETENTION_DURATION))
353
354 opts->specified_opts |= SUBOPT_MAX_RETENTION_DURATION;
355 opts->maxretention = defGetInt32(defel);
356 }
358 strcmp(defel->defname, "origin") == 0)
359 {
360 if (IsSet(opts->specified_opts, SUBOPT_ORIGIN))
362
363 opts->specified_opts |= SUBOPT_ORIGIN;
364 pfree(opts->origin);
365
366 /*
367 * Even though the "origin" parameter allows only "none" and "any"
368 * values, it is implemented as a string type so that the
369 * parameter can be extended in future versions to support
370 * filtering using origin names specified by the user.
371 */
372 opts->origin = defGetString(defel);
373
374 if ((pg_strcasecmp(opts->origin, LOGICALREP_ORIGIN_NONE) != 0) &&
378 errmsg("unrecognized origin value: \"%s\"", opts->origin));
379 }
380 else if (IsSet(supported_opts, SUBOPT_LSN) &&
381 strcmp(defel->defname, "lsn") == 0)
382 {
383 char *lsn_str = defGetString(defel);
384 XLogRecPtr lsn;
385
386 if (IsSet(opts->specified_opts, SUBOPT_LSN))
388
389 /* Setting lsn = NONE is treated as resetting LSN */
390 if (strcmp(lsn_str, "none") == 0)
391 lsn = InvalidXLogRecPtr;
392 else
393 {
394 /* Parse the argument as LSN */
397
398 if (!XLogRecPtrIsValid(lsn))
401 errmsg("invalid WAL location (LSN): %s", lsn_str)));
402 }
403
404 opts->specified_opts |= SUBOPT_LSN;
405 opts->lsn = lsn;
406 }
408 strcmp(defel->defname, "wal_receiver_timeout") == 0)
409 {
410 bool parsed;
411 int val;
412
413 if (IsSet(opts->specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
415
416 opts->specified_opts |= SUBOPT_WAL_RECEIVER_TIMEOUT;
417 opts->wal_receiver_timeout = defGetString(defel);
418
419 /*
420 * Test if the given value is valid for wal_receiver_timeout GUC.
421 * Skip this test if the value is -1, since -1 is allowed for the
422 * wal_receiver_timeout subscription option, but not for the GUC
423 * itself.
424 */
425 parsed = parse_int(opts->wal_receiver_timeout, &val, 0, NULL);
426 if (!parsed || val != -1)
427 (void) set_config_option("wal_receiver_timeout", opts->wal_receiver_timeout,
429 false, 0, false);
430 }
431 else
434 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
435 }
436
437 /*
438 * We've been explicitly asked to not connect, that requires some
439 * additional processing.
440 */
441 if (!opts->connect && IsSet(supported_opts, SUBOPT_CONNECT))
442 {
443 /* Check for incompatible options from the user. */
444 if (opts->enabled &&
445 IsSet(opts->specified_opts, SUBOPT_ENABLED))
448 /*- translator: both %s are strings of the form "option = value" */
449 errmsg("%s and %s are mutually exclusive options",
450 "connect = false", "enabled = true")));
451
452 if (opts->create_slot &&
453 IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
456 errmsg("%s and %s are mutually exclusive options",
457 "connect = false", "create_slot = true")));
458
459 if (opts->copy_data &&
460 IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
463 errmsg("%s and %s are mutually exclusive options",
464 "connect = false", "copy_data = true")));
465
466 /* Change the defaults of other options. */
467 opts->enabled = false;
468 opts->create_slot = false;
469 opts->copy_data = false;
470 }
471
472 /*
473 * Do additional checking for disallowed combination when slot_name = NONE
474 * was used.
475 */
476 if (!opts->slot_name &&
477 IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
478 {
479 if (opts->enabled)
480 {
481 if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
484 /*- translator: both %s are strings of the form "option = value" */
485 errmsg("%s and %s are mutually exclusive options",
486 "slot_name = NONE", "enabled = true")));
487 else
490 /*- translator: both %s are strings of the form "option = value" */
491 errmsg("subscription with %s must also set %s",
492 "slot_name = NONE", "enabled = false")));
493 }
494
495 if (opts->create_slot)
496 {
497 if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
500 /*- translator: both %s are strings of the form "option = value" */
501 errmsg("%s and %s are mutually exclusive options",
502 "slot_name = NONE", "create_slot = true")));
503 else
506 /*- translator: both %s are strings of the form "option = value" */
507 errmsg("subscription with %s must also set %s",
508 "slot_name = NONE", "create_slot = false")));
509 }
510 }
511}
512
513/*
514 * Check that the specified publications are present on the publisher.
515 */
516static void
518{
519 WalRcvExecResult *res;
520 StringInfoData cmd;
521 TupleTableSlot *slot;
523 Oid tableRow[1] = {TEXTOID};
524
525 initStringInfo(&cmd);
526 appendStringInfoString(&cmd, "SELECT t.pubname FROM\n"
527 " pg_catalog.pg_publication t WHERE\n"
528 " t.pubname IN (");
529 GetPublicationsStr(publications, &cmd, true);
530 appendStringInfoChar(&cmd, ')');
531
532 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
533 pfree(cmd.data);
534
535 if (res->status != WALRCV_OK_TUPLES)
537 errmsg("could not receive list of publications from the publisher: %s",
538 res->err));
539
540 publicationsCopy = list_copy(publications);
541
542 /* Process publication(s). */
544 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
545 {
546 char *pubname;
547 bool isnull;
548
549 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
550 Assert(!isnull);
551
552 /* Delete the publication present in publisher from the list. */
554 ExecClearTuple(slot);
555 }
556
558
560
562 {
563 /* Prepare the list of non-existent publication(s) for error message. */
565
567
571 errmsg_plural("publication %s does not exist on the publisher",
572 "publications %s do not exist on the publisher",
574 pubnames.data));
575 }
576}
577
578/*
579 * Auxiliary function to build a text array out of a list of String nodes.
580 */
581static Datum
583{
584 ArrayType *arr;
585 Datum *datums;
586 MemoryContext memcxt;
588
589 /* Create memory context for temporary allocations. */
591 "publicationListToArray to array",
594
596
598
600
602
603 MemoryContextDelete(memcxt);
604
605 return PointerGetDatum(arr);
606}
607
608/*
609 * Create new subscription.
610 */
613 bool isTopLevel)
614{
615 Relation rel;
617 Oid subid;
618 bool nulls[Natts_pg_subscription];
620 Oid owner = GetUserId();
622 char *conninfo;
624 List *publications;
626 SubOpts opts = {0};
628
629 /*
630 * Parse and check options.
631 *
632 * Connection and publication should not be specified here.
633 */
644
645 /*
646 * Since creating a replication slot is not transactional, rolling back
647 * the transaction leaves the created replication slot. So we cannot run
648 * CREATE SUBSCRIPTION inside a transaction block if creating a
649 * replication slot.
650 */
651 if (opts.create_slot)
652 PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
653
654 /*
655 * We don't want to allow unprivileged users to be able to trigger
656 * attempts to access arbitrary network destinations, so require the user
657 * to have been specifically authorized to create subscriptions.
658 */
662 errmsg("permission denied to create subscription"),
663 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions.",
664 "pg_create_subscription")));
665
666 /*
667 * Since a subscription is a database object, we also check for CREATE
668 * permission on the database.
669 */
671 owner, ACL_CREATE);
672 if (aclresult != ACLCHECK_OK)
675
676 /*
677 * Non-superusers are required to set a password for authentication, and
678 * that password must be used by the target server, but the superuser can
679 * exempt a subscription from this requirement.
680 */
681 if (!opts.passwordrequired && !superuser_arg(owner))
684 errmsg("password_required=false is superuser-only"),
685 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
686
687 /*
688 * If built with appropriate switch, whine when regression-testing
689 * conventions for subscription names are violated.
690 */
691#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
692 if (strncmp(stmt->subname, "regress_", 8) != 0)
693 elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
694#endif
695
697
698 /* Check if name is used */
701 if (OidIsValid(subid))
702 {
705 errmsg("subscription \"%s\" already exists",
706 stmt->subname)));
707 }
708
709 /*
710 * Ensure that system configuration parameters are set appropriately to
711 * support retain_dead_tuples and max_retention_duration.
712 */
714 opts.retaindeadtuples, opts.retaindeadtuples,
715 (opts.maxretention > 0));
716
717 if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
718 opts.slot_name == NULL)
719 opts.slot_name = stmt->subname;
720
721 /* The default for synchronous_commit of subscriptions is off. */
722 if (opts.synchronous_commit == NULL)
723 opts.synchronous_commit = "off";
724
725 /*
726 * The default for wal_receiver_timeout of subscriptions is -1, which
727 * means the value is inherited from the server configuration, command
728 * line, or role/database settings.
729 */
730 if (opts.wal_receiver_timeout == NULL)
731 opts.wal_receiver_timeout = "-1";
732
733 conninfo = stmt->conninfo;
734 publications = stmt->publication;
735
736 /* Load the library providing us libpq calls. */
737 load_file("libpqwalreceiver", false);
738
739 /* Check the connection info string. */
740 walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
741
742 /* Everything ok, form a new tuple. */
743 memset(values, 0, sizeof(values));
744 memset(nulls, false, sizeof(nulls));
745
758 CharGetDatum(opts.twophase ?
766 BoolGetDatum(opts.retaindeadtuples);
768 Int32GetDatum(opts.maxretention);
770 Int32GetDatum(opts.retaindeadtuples);
772 CStringGetTextDatum(conninfo);
773 if (opts.slot_name)
776 else
777 nulls[Anum_pg_subscription_subslotname - 1] = true;
779 CStringGetTextDatum(opts.synchronous_commit);
781 CStringGetTextDatum(opts.wal_receiver_timeout);
783 publicationListToArray(publications);
786
788
789 /* Insert tuple into catalog. */
792
794
795 /*
796 * A replication origin is currently created for all subscriptions,
797 * including those that only contain sequences or are otherwise empty.
798 *
799 * XXX: While this is technically unnecessary, optimizing it would require
800 * additional logic to skip origin creation during DDL operations and
801 * apply workers initialization, and to handle origin creation dynamically
802 * when tables are added to the subscription. It is not clear whether
803 * preventing creation of origins is worth additional complexity.
804 */
807
808 /*
809 * Connect to remote side to execute requested commands and fetch table
810 * and sequence info.
811 */
812 if (opts.connect)
813 {
814 char *err;
817
818 /* Try to connect to the publisher. */
819 must_use_password = !superuser_arg(owner) && opts.passwordrequired;
820 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
821 stmt->subname, &err);
822 if (!wrconn)
825 errmsg("subscription \"%s\" could not connect to the publisher: %s",
826 stmt->subname, err)));
827
828 PG_TRY();
829 {
830 bool has_tables = false;
831 List *pubrels;
832 char relation_state;
833
834 check_publications(wrconn, publications);
836 opts.copy_data,
837 opts.retaindeadtuples, opts.origin,
838 NULL, 0, stmt->subname);
840 opts.copy_data, opts.origin,
841 NULL, 0, stmt->subname);
842
843 if (opts.retaindeadtuples)
845
846 /*
847 * Set sync state based on if we were asked to do data copy or
848 * not.
849 */
851
852 /*
853 * Build local relation status info. Relations are for both tables
854 * and sequences from the publisher.
855 */
856 pubrels = fetch_relation_list(wrconn, publications);
857
859 {
860 Oid relid;
861 char relkind;
862 RangeVar *rv = pubrelinfo->rv;
863
864 relid = RangeVarGetRelid(rv, AccessShareLock, false);
865 relkind = get_rel_relkind(relid);
866
867 /* Check for supported relkind. */
868 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
869 rv->schemaname, rv->relname);
870 has_tables |= (relkind != RELKIND_SEQUENCE);
872 InvalidXLogRecPtr, true);
873 }
874
875 /*
876 * If requested, create permanent slot for the subscription. We
877 * won't use the initial snapshot for anything, so no need to
878 * export it.
879 *
880 * XXX: Similar to origins, it is not clear whether preventing the
881 * slot creation for empty and sequence-only subscriptions is
882 * worth additional complexity.
883 */
884 if (opts.create_slot)
885 {
886 bool twophase_enabled = false;
887
888 Assert(opts.slot_name);
889
890 /*
891 * Even if two_phase is set, don't create the slot with
892 * two-phase enabled. Will enable it once all the tables are
893 * synced and ready. This avoids race-conditions like prepared
894 * transactions being skipped due to changes not being applied
895 * due to checks in should_apply_changes_for_rel() when
896 * tablesync for the corresponding tables are in progress. See
897 * comments atop worker.c.
898 *
899 * Note that if tables were specified but copy_data is false
900 * then it is safe to enable two_phase up-front because those
901 * tables are already initially in READY state. When the
902 * subscription has no tables, we leave the twophase state as
903 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
904 * PUBLICATION to work.
905 */
906 if (opts.twophase && !opts.copy_data && has_tables)
907 twophase_enabled = true;
908
910 opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);
911
914
916 (errmsg("created replication slot \"%s\" on publisher",
917 opts.slot_name)));
918 }
919 }
920 PG_FINALLY();
921 {
923 }
924 PG_END_TRY();
925 }
926 else
928 (errmsg("subscription was created, but is not connected"),
929 errhint("To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.")));
930
932
934
935 /*
936 * Notify the launcher to start the apply worker if the subscription is
937 * enabled, or to create the conflict detection slot if retain_dead_tuples
938 * is enabled.
939 *
940 * Creating the conflict detection slot is essential even when the
941 * subscription is not enabled. This ensures that dead tuples are
942 * retained, which is necessary for accurately identifying the type of
943 * conflict during replication.
944 */
945 if (opts.enabled || opts.retaindeadtuples)
947
949
951
952 return myself;
953}
954
955static void
958{
959 char *err;
960 List *pubrels = NIL;
966 int subrel_count;
967 ListCell *lc;
968 int off;
969 int tbl_count = 0;
970 int seq_count = 0;
971 Relation rel = NULL;
972 typedef struct SubRemoveRels
973 {
974 Oid relid;
975 char state;
977
980
981 /* Load the library providing us libpq calls. */
982 load_file("libpqwalreceiver", false);
983
984 /* Try to connect to the publisher. */
987 sub->name, &err);
988 if (!wrconn)
991 errmsg("subscription \"%s\" could not connect to the publisher: %s",
992 sub->name, err)));
993
994 PG_TRY();
995 {
998
999 /* Get the relation list from publisher. */
1001
1002 /* Get local relation list. */
1003 subrel_states = GetSubscriptionRelations(sub->oid, true, true, false);
1005
1006 /*
1007 * Build qsorted arrays of local table oids and sequence oids for
1008 * faster lookup. This can potentially contain all tables and
1009 * sequences in the database so speed of lookup is important.
1010 *
1011 * We do not yet know the exact count of tables and sequences, so we
1012 * allocate separate arrays for table OIDs and sequence OIDs based on
1013 * the total number of relations (subrel_count).
1014 */
1017 foreach(lc, subrel_states)
1018 {
1020
1021 if (get_rel_relkind(relstate->relid) == RELKIND_SEQUENCE)
1022 subseq_local_oids[seq_count++] = relstate->relid;
1023 else
1024 subrel_local_oids[tbl_count++] = relstate->relid;
1025 }
1026
1029 sub->retaindeadtuples, sub->origin,
1031 sub->name);
1032
1035 copy_data, sub->origin,
1037 sub->name);
1038
1039 /*
1040 * Walk over the remote relations and try to match them to locally
1041 * known relations. If the relation is not known locally create a new
1042 * state for it.
1043 *
1044 * Also builds array of local oids of remote relations for the next
1045 * step.
1046 */
1047 off = 0;
1049
1051 {
1052 RangeVar *rv = pubrelinfo->rv;
1053 Oid relid;
1054 char relkind;
1055
1056 relid = RangeVarGetRelid(rv, AccessShareLock, false);
1057 relkind = get_rel_relkind(relid);
1058
1059 /* Check for supported relkind. */
1060 CheckSubscriptionRelkind(relkind, pubrelinfo->relkind,
1061 rv->schemaname, rv->relname);
1062
1063 pubrel_local_oids[off++] = relid;
1064
1065 if (!bsearch(&relid, subrel_local_oids,
1066 tbl_count, sizeof(Oid), oid_cmp) &&
1067 !bsearch(&relid, subseq_local_oids,
1068 seq_count, sizeof(Oid), oid_cmp))
1069 {
1070 AddSubscriptionRelState(sub->oid, relid,
1072 InvalidXLogRecPtr, true);
1074 errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"",
1075 relkind == RELKIND_SEQUENCE ? "sequence" : "table",
1076 rv->schemaname, rv->relname, sub->name));
1077 }
1078 }
1079
1080 /*
1081 * Next remove state for tables we should not care about anymore using
1082 * the data we collected above
1083 */
1085
1086 for (off = 0; off < tbl_count; off++)
1087 {
1088 Oid relid = subrel_local_oids[off];
1089
1090 if (!bsearch(&relid, pubrel_local_oids,
1091 list_length(pubrels), sizeof(Oid), oid_cmp))
1092 {
1093 char state;
1094 XLogRecPtr statelsn;
1096
1097 /*
1098 * Lock pg_subscription_rel with AccessExclusiveLock to
1099 * prevent any race conditions with the apply worker
1100 * re-launching workers at the same time this code is trying
1101 * to remove those tables.
1102 *
1103 * Even if new worker for this particular rel is restarted it
1104 * won't be able to make any progress as we hold exclusive
1105 * lock on pg_subscription_rel till the transaction end. It
1106 * will simply exit as there is no corresponding rel entry.
1107 *
1108 * This locking also ensures that the state of rels won't
1109 * change till we are done with this refresh operation.
1110 */
1111 if (!rel)
1113
1114 /* Last known rel state. */
1115 state = GetSubscriptionRelState(sub->oid, relid, &statelsn);
1116
1117 RemoveSubscriptionRel(sub->oid, relid);
1118
1119 remove_rel->relid = relid;
1120 remove_rel->state = state;
1121
1123
1125
1126 /*
1127 * For READY state, we would have already dropped the
1128 * tablesync origin.
1129 */
1131 {
1132 char originname[NAMEDATALEN];
1133
1134 /*
1135 * Drop the tablesync's origin tracking if exists.
1136 *
1137 * It is possible that the origin is not yet created for
1138 * tablesync worker, this can happen for the states before
1139 * SUBREL_STATE_DATASYNC. The tablesync worker or apply
1140 * worker can also concurrently try to drop the origin and
1141 * by this time the origin might be already removed. For
1142 * these reasons, passing missing_ok = true.
1143 */
1145 sizeof(originname));
1146 replorigin_drop_by_name(originname, true, false);
1147 }
1148
1150 (errmsg_internal("table \"%s.%s\" removed from subscription \"%s\"",
1152 get_rel_name(relid),
1153 sub->name)));
1154 }
1155 }
1156
1157 /*
1158 * Drop the tablesync slots associated with removed tables. This has
1159 * to be at the end because otherwise if there is an error while doing
1160 * the database operations we won't be able to rollback dropped slots.
1161 */
1163 {
1164 if (sub_remove_rel->state != SUBREL_STATE_READY &&
1166 {
1167 char syncslotname[NAMEDATALEN] = {0};
1168
1169 /*
1170 * For READY/SYNCDONE states we know the tablesync slot has
1171 * already been dropped by the tablesync worker.
1172 *
1173 * For other states, there is no certainty, maybe the slot
1174 * does not exist yet. Also, if we fail after removing some of
1175 * the slots, next time, it will again try to drop already
1176 * dropped slots and fail. For these reasons, we allow
1177 * missing_ok = true for the drop.
1178 */
1180 syncslotname, sizeof(syncslotname));
1182 }
1183 }
1184
1185 /*
1186 * Next remove state for sequences we should not care about anymore
1187 * using the data we collected above
1188 */
1189 for (off = 0; off < seq_count; off++)
1190 {
1191 Oid relid = subseq_local_oids[off];
1192
1193 if (!bsearch(&relid, pubrel_local_oids,
1194 list_length(pubrels), sizeof(Oid), oid_cmp))
1195 {
1196 /*
1197 * This locking ensures that the state of rels won't change
1198 * till we are done with this refresh operation.
1199 */
1200 if (!rel)
1202
1203 RemoveSubscriptionRel(sub->oid, relid);
1204
1206 errmsg_internal("sequence \"%s.%s\" removed from subscription \"%s\"",
1208 get_rel_name(relid),
1209 sub->name));
1210 }
1211 }
1212 }
1213 PG_FINALLY();
1214 {
1216 }
1217 PG_END_TRY();
1218
1219 if (rel)
1220 table_close(rel, NoLock);
1221}
1222
1223/*
1224 * Marks all sequences with INIT state.
1225 */
1226static void
1228{
1229 char *err = NULL;
1231 bool must_use_password;
1232
1233 /* Load the library providing us libpq calls. */
1234 load_file("libpqwalreceiver", false);
1235
1236 /* Try to connect to the publisher. */
1238 wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
1239 sub->name, &err);
1240 if (!wrconn)
1241 ereport(ERROR,
1243 errmsg("subscription \"%s\" could not connect to the publisher: %s",
1244 sub->name, err));
1245
1246 PG_TRY();
1247 {
1249
1251 sub->origin, NULL, 0, sub->name);
1252
1253 /* Get local sequence list. */
1254 subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
1256 {
1257 Oid relid = subrel->relid;
1258
1260 InvalidXLogRecPtr, false);
1262 errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
1264 get_rel_name(relid),
1265 sub->name));
1266 }
1267 }
1268 PG_FINALLY();
1269 {
1271 }
1272 PG_END_TRY();
1273}
1274
1275/*
1276 * Common checks for altering failover, two_phase, and retain_dead_tuples
1277 * options.
1278 */
1279static void
1281 bool slot_needs_update, bool isTopLevel)
1282{
1283 Assert(strcmp(option, "failover") == 0 ||
1284 strcmp(option, "two_phase") == 0 ||
1285 strcmp(option, "retain_dead_tuples") == 0);
1286
1287 /*
1288 * Altering the retain_dead_tuples option does not update the slot on the
1289 * publisher.
1290 */
1291 Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
1292
1293 /*
1294 * Do not allow changing the option if the subscription is enabled. This
1295 * is because both failover and two_phase options of the slot on the
1296 * publisher cannot be modified if the slot is currently acquired by the
1297 * existing walsender.
1298 *
1299 * Note that two_phase is enabled (aka changed from 'false' to 'true') on
1300 * the publisher by the existing walsender, so we could have allowed that
1301 * even when the subscription is enabled. But we kept this restriction for
1302 * the sake of consistency and simplicity.
1303 *
1304 * Additionally, do not allow changing the retain_dead_tuples option when
1305 * the subscription is enabled to prevent race conditions arising from the
1306 * new option value being acknowledged asynchronously by the launcher and
1307 * apply workers.
1308 *
1309 * Without the restriction, a race condition may arise when a user
1310 * disables and immediately re-enables the retain_dead_tuples option. In
1311 * this case, the launcher might drop the slot upon noticing the disabled
1312 * action, while the apply worker may keep maintaining
1313 * oldest_nonremovable_xid without noticing the option change. During this
1314 * period, a transaction ID wraparound could falsely make this ID appear
1315 * as if it originates from the future w.r.t the transaction ID stored in
1316 * the slot maintained by launcher.
1317 *
1318 * Similarly, if the user enables retain_dead_tuples concurrently with the
1319 * launcher starting the worker, the apply worker may start calculating
1320 * oldest_nonremovable_xid before the launcher notices the enable action.
1321 * Consequently, the launcher may update slot.xmin to a newer value than
1322 * that maintained by the worker. In subsequent cycles, upon integrating
1323 * the worker's oldest_nonremovable_xid, the launcher might detect a
1324 * retreat in the calculated xmin, necessitating additional handling.
1325 *
1326 * XXX To address the above race conditions, we can define
1327 * oldest_nonremovable_xid as FullTransactionId and adds the check to
1328 * disallow retreating the conflict slot's xmin. For now, we kept the
1329 * implementation simple by disallowing change to the retain_dead_tuples,
1330 * but in the future we can change this after some more analysis.
1331 *
1332 * Note that we could restrict only the enabling of retain_dead_tuples to
1333 * avoid the race conditions described above, but we maintain the
1334 * restriction for both enable and disable operations for the sake of
1335 * consistency.
1336 */
1337 if (sub->enabled)
1338 ereport(ERROR,
1340 errmsg("cannot set option \"%s\" for enabled subscription",
1341 option)));
1342
1344 {
1345 StringInfoData cmd;
1346
1347 /*
1348 * A valid slot must be associated with the subscription for us to
1349 * modify any of the slot's properties.
1350 */
1351 if (!sub->slotname)
1352 ereport(ERROR,
1354 errmsg("cannot set option \"%s\" for a subscription that does not have a slot name",
1355 option)));
1356
1357 /* The changed option of the slot can't be rolled back. */
1358 initStringInfo(&cmd);
1359 appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
1360
1362 pfree(cmd.data);
1363 }
1364}
1365
1366/*
1367 * Alter the existing subscription.
1368 */
1371 bool isTopLevel)
1372{
1373 Relation rel;
1375 bool nulls[Natts_pg_subscription];
1378 HeapTuple tup;
1379 Oid subid;
1380 bool update_tuple = false;
1381 bool update_failover = false;
1382 bool update_two_phase = false;
1383 bool check_pub_rdt = false;
1384 bool retain_dead_tuples;
1385 int max_retention;
1386 bool retention_active;
1387 char *origin;
1388 Subscription *sub;
1391 SubOpts opts = {0};
1392
1394
1395 /* Fetch the existing tuple. */
1397 CStringGetDatum(stmt->subname));
1398
1399 if (!HeapTupleIsValid(tup))
1400 ereport(ERROR,
1402 errmsg("subscription \"%s\" does not exist",
1403 stmt->subname)));
1404
1406 subid = form->oid;
1407
1408 /* must be owner */
1411 stmt->subname);
1412
1413 sub = GetSubscription(subid, false);
1414
1416 origin = sub->origin;
1419
1420 /*
1421 * Don't allow non-superuser modification of a subscription with
1422 * password_required=false.
1423 */
1424 if (!sub->passwordrequired && !superuser())
1425 ereport(ERROR,
1427 errmsg("password_required=false is superuser-only"),
1428 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1429
1430 /* Lock the subscription so nobody else can do anything with it. */
1432
1433 /* Form a new tuple. */
1434 memset(values, 0, sizeof(values));
1435 memset(nulls, false, sizeof(nulls));
1436 memset(replaces, false, sizeof(replaces));
1437
1438 switch (stmt->kind)
1439 {
1441 {
1452
1453 parse_subscription_options(pstate, stmt->options,
1455
1456 if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1457 {
1458 /*
1459 * The subscription must be disabled to allow slot_name as
1460 * 'none', otherwise, the apply worker will repeatedly try
1461 * to stream the data using that slot_name which neither
1462 * exists on the publisher nor the user will be allowed to
1463 * create it.
1464 */
1465 if (sub->enabled && !opts.slot_name)
1466 ereport(ERROR,
1468 errmsg("cannot set %s for enabled subscription",
1469 "slot_name = NONE")));
1470
1471 if (opts.slot_name)
1474 else
1475 nulls[Anum_pg_subscription_subslotname - 1] = true;
1477 }
1478
1479 if (opts.synchronous_commit)
1480 {
1482 CStringGetTextDatum(opts.synchronous_commit);
1484 }
1485
1486 if (IsSet(opts.specified_opts, SUBOPT_BINARY))
1487 {
1489 BoolGetDatum(opts.binary);
1491 }
1492
1493 if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
1494 {
1496 CharGetDatum(opts.streaming);
1498 }
1499
1500 if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
1501 {
1503 = BoolGetDatum(opts.disableonerr);
1505 = true;
1506 }
1507
1508 if (IsSet(opts.specified_opts, SUBOPT_PASSWORD_REQUIRED))
1509 {
1510 /* Non-superuser may not disable password_required. */
1511 if (!opts.passwordrequired && !superuser())
1512 ereport(ERROR,
1514 errmsg("password_required=false is superuser-only"),
1515 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
1516
1518 = BoolGetDatum(opts.passwordrequired);
1520 = true;
1521 }
1522
1523 if (IsSet(opts.specified_opts, SUBOPT_RUN_AS_OWNER))
1524 {
1526 BoolGetDatum(opts.runasowner);
1528 }
1529
1530 if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
1531 {
1532 /*
1533 * We need to update both the slot and the subscription
1534 * for the two_phase option. We can enable the two_phase
1535 * option for a slot only once the initial data
1536 * synchronization is done. This is to avoid missing some
1537 * data as explained in comments atop worker.c.
1538 */
1539 update_two_phase = !opts.twophase;
1540
1541 CheckAlterSubOption(sub, "two_phase", update_two_phase,
1542 isTopLevel);
1543
1544 /*
1545 * Modifying the two_phase slot option requires a slot
1546 * lookup by slot name, so changing the slot name at the
1547 * same time is not allowed.
1548 */
1549 if (update_two_phase &&
1550 IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
1551 ereport(ERROR,
1553 errmsg("\"slot_name\" and \"two_phase\" cannot be altered at the same time")));
1554
1555 /*
1556 * Note that workers may still survive even if the
1557 * subscription has been disabled.
1558 *
1559 * Ensure workers have already been exited to avoid
1560 * getting prepared transactions while we are disabling
1561 * the two_phase option. Otherwise, the changes of an
1562 * already prepared transaction can be replicated again
1563 * along with its corresponding commit, leading to
1564 * duplicate data or errors.
1565 */
1566 if (logicalrep_workers_find(subid, true, true))
1567 ereport(ERROR,
1569 errmsg("cannot alter \"two_phase\" when logical replication worker is still running"),
1570 errhint("Try again after some time.")));
1571
1572 /*
1573 * two_phase cannot be disabled if there are any
1574 * uncommitted prepared transactions present otherwise it
1575 * can lead to duplicate data or errors as explained in
1576 * the comment above.
1577 */
1578 if (update_two_phase &&
1580 LookupGXactBySubid(subid))
1581 ereport(ERROR,
1583 errmsg("cannot disable \"two_phase\" when prepared transactions exist"),
1584 errhint("Resolve these transactions and try again.")));
1585
1586 /* Change system catalog accordingly */
1588 CharGetDatum(opts.twophase ?
1592 }
1593
1594 if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
1595 {
1596 /*
1597 * Similar to the two_phase case above, we need to update
1598 * the failover option for both the slot and the
1599 * subscription.
1600 */
1601 update_failover = true;
1602
1603 CheckAlterSubOption(sub, "failover", update_failover,
1604 isTopLevel);
1605
1607 BoolGetDatum(opts.failover);
1609 }
1610
1611 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
1612 {
1614 BoolGetDatum(opts.retaindeadtuples);
1616
1617 /*
1618 * Update the retention status only if there's a change in
1619 * the retain_dead_tuples option value.
1620 *
1621 * Automatically marking retention as active when
1622 * retain_dead_tuples is enabled may not always be ideal,
1623 * especially if retention was previously stopped and the
1624 * user toggles retain_dead_tuples without adjusting the
1625 * publisher workload. However, this behavior provides a
1626 * convenient way for users to manually refresh the
1627 * retention status. Since retention will be stopped again
1628 * unless the publisher workload is reduced, this approach
1629 * is acceptable for now.
1630 */
1631 if (opts.retaindeadtuples != sub->retaindeadtuples)
1632 {
1634 BoolGetDatum(opts.retaindeadtuples);
1636
1637 retention_active = opts.retaindeadtuples;
1638 }
1639
1640 CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
1641
1642 /*
1643 * Workers may continue running even after the
1644 * subscription has been disabled.
1645 *
1646 * To prevent race conditions (as described in
1647 * CheckAlterSubOption()), ensure that all worker
1648 * processes have already exited before proceeding.
1649 */
1650 if (logicalrep_workers_find(subid, true, true))
1651 ereport(ERROR,
1653 errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
1654 errhint("Try again after some time.")));
1655
1656 /*
1657 * Notify the launcher to manage the replication slot for
1658 * conflict detection. This ensures that replication slot
1659 * is efficiently handled (created, updated, or dropped)
1660 * in response to any configuration changes.
1661 */
1663
1664 check_pub_rdt = opts.retaindeadtuples;
1665 retain_dead_tuples = opts.retaindeadtuples;
1666 }
1667
1668 if (IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1669 {
1671 Int32GetDatum(opts.maxretention);
1673
1674 max_retention = opts.maxretention;
1675 }
1676
1677 /*
1678 * Ensure that system configuration parameters are set
1679 * appropriately to support retain_dead_tuples and
1680 * max_retention_duration.
1681 */
1682 if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES) ||
1683 IsSet(opts.specified_opts, SUBOPT_MAX_RETENTION_DURATION))
1687 (max_retention > 0));
1688
1689 if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
1690 {
1692 CStringGetTextDatum(opts.origin);
1694
1695 /*
1696 * Check if changes from different origins may be received
1697 * from the publisher when the origin is changed to ANY
1698 * and retain_dead_tuples is enabled.
1699 */
1702
1703 origin = opts.origin;
1704 }
1705
1706 if (IsSet(opts.specified_opts, SUBOPT_WAL_RECEIVER_TIMEOUT))
1707 {
1709 CStringGetTextDatum(opts.wal_receiver_timeout);
1711 }
1712
1713 update_tuple = true;
1714 break;
1715 }
1716
1718 {
1719 parse_subscription_options(pstate, stmt->options,
1721 Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
1722
1723 if (!sub->slotname && opts.enabled)
1724 ereport(ERROR,
1726 errmsg("cannot enable subscription that does not have a slot name")));
1727
1728 /*
1729 * Check track_commit_timestamp only when enabling the
1730 * subscription in case it was disabled after creation. See
1731 * comments atop CheckSubDeadTupleRetention() for details.
1732 */
1733 CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
1735 sub->retentionactive, false);
1736
1738 BoolGetDatum(opts.enabled);
1740
1741 if (opts.enabled)
1743
1744 update_tuple = true;
1745
1746 /*
1747 * The subscription might be initially created with
1748 * connect=false and retain_dead_tuples=true, meaning the
1749 * remote server's status may not be checked. Ensure this
1750 * check is conducted now.
1751 */
1752 check_pub_rdt = sub->retaindeadtuples && opts.enabled;
1753 break;
1754 }
1755
1757 /* Load the library providing us libpq calls. */
1758 load_file("libpqwalreceiver", false);
1759 /* Check the connection info string. */
1760 walrcv_check_conninfo(stmt->conninfo,
1761 sub->passwordrequired && !sub->ownersuperuser);
1762
1764 CStringGetTextDatum(stmt->conninfo);
1766 update_tuple = true;
1767
1768 /*
1769 * Since the remote server configuration might have changed,
1770 * perform a check to ensure it permits enabling
1771 * retain_dead_tuples.
1772 */
1774 break;
1775
1777 {
1779 parse_subscription_options(pstate, stmt->options,
1781
1783 publicationListToArray(stmt->publication);
1785
1786 update_tuple = true;
1787
1788 /* Refresh if user asked us to. */
1789 if (opts.refresh)
1790 {
1791 if (!sub->enabled)
1792 ereport(ERROR,
1794 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1795 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
1796
1797 /*
1798 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1799 * why this is not allowed.
1800 */
1801 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1802 ereport(ERROR,
1804 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1805 errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1806
1807 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1808
1809 /* Make sure refresh sees the new list of publications. */
1810 sub->publications = stmt->publication;
1811
1812 AlterSubscription_refresh(sub, opts.copy_data,
1813 stmt->publication);
1814 }
1815
1816 break;
1817 }
1818
1821 {
1822 List *publist;
1824
1826 parse_subscription_options(pstate, stmt->options,
1828
1829 publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
1833
1834 update_tuple = true;
1835
1836 /* Refresh if user asked us to. */
1837 if (opts.refresh)
1838 {
1839 /* We only need to validate user specified publications. */
1840 List *validate_publications = (isadd) ? stmt->publication : NULL;
1841
1842 if (!sub->enabled)
1843 ereport(ERROR,
1845 errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
1846 /* translator: %s is an SQL ALTER command */
1847 errhint("Use %s instead.",
1848 isadd ?
1849 "ALTER SUBSCRIPTION ... ADD PUBLICATION ... WITH (refresh = false)" :
1850 "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)")));
1851
1852 /*
1853 * See ALTER_SUBSCRIPTION_REFRESH_PUBLICATION for details
1854 * why this is not allowed.
1855 */
1856 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1857 ereport(ERROR,
1859 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
1860 /* translator: %s is an SQL ALTER command */
1861 errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.",
1862 isadd ?
1863 "ALTER SUBSCRIPTION ... ADD PUBLICATION" :
1864 "ALTER SUBSCRIPTION ... DROP PUBLICATION")));
1865
1866 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh");
1867
1868 /* Refresh the new list of publications. */
1869 sub->publications = publist;
1870
1871 AlterSubscription_refresh(sub, opts.copy_data,
1873 }
1874
1875 break;
1876 }
1877
1879 {
1880 if (!sub->enabled)
1881 ereport(ERROR,
1883 errmsg("%s is not allowed for disabled subscriptions",
1884 "ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
1885
1886 parse_subscription_options(pstate, stmt->options,
1888
1889 /*
1890 * The subscription option "two_phase" requires that
1891 * replication has passed the initial table synchronization
1892 * phase before the two_phase becomes properly enabled.
1893 *
1894 * But, having reached this two-phase commit "enabled" state
1895 * we must not allow any subsequent table initialization to
1896 * occur. So the ALTER SUBSCRIPTION ... REFRESH PUBLICATION is
1897 * disallowed when the user had requested two_phase = on mode.
1898 *
1899 * The exception to this restriction is when copy_data =
1900 * false, because when copy_data is false the tablesync will
1901 * start already in READY state and will exit directly without
1902 * doing anything.
1903 *
1904 * For more details see comments atop worker.c.
1905 */
1906 if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
1907 ereport(ERROR,
1909 errmsg("ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data is not allowed when two_phase is enabled"),
1910 errhint("Use ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data = false, or use DROP/CREATE SUBSCRIPTION.")));
1911
1912 PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION");
1913
1914 AlterSubscription_refresh(sub, opts.copy_data, NULL);
1915
1916 break;
1917 }
1918
1920 {
1921 if (!sub->enabled)
1922 ereport(ERROR,
1924 errmsg("%s is not allowed for disabled subscriptions",
1925 "ALTER SUBSCRIPTION ... REFRESH SEQUENCES"));
1926
1928
1929 break;
1930 }
1931
1933 {
1934 parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
1935
1936 /* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
1937 Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
1938
1939 /*
1940 * If the user sets subskiplsn, we do a sanity check to make
1941 * sure that the specified LSN is a probable value.
1942 */
1943 if (XLogRecPtrIsValid(opts.lsn))
1944 {
1946 char originname[NAMEDATALEN];
1947 XLogRecPtr remote_lsn;
1948
1950 originname, sizeof(originname));
1952 remote_lsn = replorigin_get_progress(originid, false);
1953
1954 /* Check the given LSN is at least a future LSN */
1955 if (XLogRecPtrIsValid(remote_lsn) && opts.lsn < remote_lsn)
1956 ereport(ERROR,
1958 errmsg("skip WAL location (LSN %X/%08X) must be greater than origin LSN %X/%08X",
1959 LSN_FORMAT_ARGS(opts.lsn),
1960 LSN_FORMAT_ARGS(remote_lsn))));
1961 }
1962
1965
1966 update_tuple = true;
1967 break;
1968 }
1969
1970 default:
1971 elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
1972 stmt->kind);
1973 }
1974
1975 /* Update the catalog if needed. */
1976 if (update_tuple)
1977 {
1979 replaces);
1980
1981 CatalogTupleUpdate(rel, &tup->t_self, tup);
1982
1984 }
1985
1986 /*
1987 * Try to acquire the connection necessary either for modifying the slot
1988 * or for checking if the remote server permits enabling
1989 * retain_dead_tuples.
1990 *
1991 * This has to be at the end because otherwise if there is an error while
1992 * doing the database operations we won't be able to rollback altered
1993 * slot.
1994 */
1996 {
1997 bool must_use_password;
1998 char *err;
2000
2001 /* Load the library providing us libpq calls. */
2002 load_file("libpqwalreceiver", false);
2003
2004 /*
2005 * Try to connect to the publisher, using the new connection string if
2006 * available.
2007 */
2009 wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
2010 true, true, must_use_password, sub->name,
2011 &err);
2012 if (!wrconn)
2013 ereport(ERROR,
2015 errmsg("subscription \"%s\" could not connect to the publisher: %s",
2016 sub->name, err)));
2017
2018 PG_TRY();
2019 {
2022
2024 retain_dead_tuples, origin, NULL, 0,
2025 sub->name);
2026
2029 update_failover ? &opts.failover : NULL,
2030 update_two_phase ? &opts.twophase : NULL);
2031 }
2032 PG_FINALLY();
2033 {
2035 }
2036 PG_END_TRY();
2037 }
2038
2040
2042
2044
2045 /* Wake up related replication workers to handle this change quickly. */
2047
2048 return myself;
2049}
2050
2051/*
2052 * Drop a subscription
2053 */
2054void
2056{
2057 Relation rel;
2059 HeapTuple tup;
2060 Oid subid;
2061 Oid subowner;
2062 Datum datum;
2063 bool isnull;
2064 char *subname;
2065 char *conninfo;
2066 char *slotname;
2068 ListCell *lc;
2069 char originname[NAMEDATALEN];
2070 char *err = NULL;
2073 List *rstates;
2074 bool must_use_password;
2075
2076 /*
2077 * The launcher may concurrently start a new worker for this subscription.
2078 * During initialization, the worker checks for subscription validity and
2079 * exits if the subscription has already been dropped. See
2080 * InitializeLogRepWorker.
2081 */
2083
2085 CStringGetDatum(stmt->subname));
2086
2087 if (!HeapTupleIsValid(tup))
2088 {
2089 table_close(rel, NoLock);
2090
2091 if (!stmt->missing_ok)
2092 ereport(ERROR,
2094 errmsg("subscription \"%s\" does not exist",
2095 stmt->subname)));
2096 else
2098 (errmsg("subscription \"%s\" does not exist, skipping",
2099 stmt->subname)));
2100
2101 return;
2102 }
2103
2105 subid = form->oid;
2106 subowner = form->subowner;
2107 must_use_password = !superuser_arg(subowner) && form->subpasswordrequired;
2108
2109 /* must be owner */
2112 stmt->subname);
2113
2114 /* DROP hook for the subscription being removed */
2116
2117 /*
2118 * Lock the subscription so nobody else can do anything with it (including
2119 * the replication workers).
2120 */
2122
2123 /* Get subname */
2126 subname = pstrdup(NameStr(*DatumGetName(datum)));
2127
2128 /* Get conninfo */
2131 conninfo = TextDatumGetCString(datum);
2132
2133 /* Get slotname */
2136 if (!isnull)
2137 slotname = pstrdup(NameStr(*DatumGetName(datum)));
2138 else
2139 slotname = NULL;
2140
2141 /*
2142 * Since dropping a replication slot is not transactional, the replication
2143 * slot stays dropped even if the transaction rolls back. So we cannot
2144 * run DROP SUBSCRIPTION inside a transaction block if dropping the
2145 * replication slot. Also, in this case, we report a message for dropping
2146 * the subscription to the cumulative stats system.
2147 *
2148 * XXX The command name should really be something like "DROP SUBSCRIPTION
2149 * of a subscription that is associated with a replication slot", but we
2150 * don't have the proper facilities for that.
2151 */
2152 if (slotname)
2153 PreventInTransactionBlock(isTopLevel, "DROP SUBSCRIPTION");
2154
2157
2158 /* Remove the tuple from catalog. */
2159 CatalogTupleDelete(rel, &tup->t_self);
2160
2162
2163 /*
2164 * Stop all the subscription workers immediately.
2165 *
2166 * This is necessary if we are dropping the replication slot, so that the
2167 * slot becomes accessible.
2168 *
2169 * It is also necessary if the subscription is disabled and was disabled
2170 * in the same transaction. Then the workers haven't seen the disabling
2171 * yet and will still be running, leading to hangs later when we want to
2172 * drop the replication origin. If the subscription was disabled before
2173 * this transaction, then there shouldn't be any workers left, so this
2174 * won't make a difference.
2175 *
2176 * New workers won't be started because we hold an exclusive lock on the
2177 * subscription till the end of the transaction.
2178 */
2179 subworkers = logicalrep_workers_find(subid, false, true);
2180 foreach(lc, subworkers)
2181 {
2183
2185 }
2187
2188 /*
2189 * Remove the no-longer-useful entry in the launcher's table of apply
2190 * worker start times.
2191 *
2192 * If this transaction rolls back, the launcher might restart a failed
2193 * apply worker before wal_retrieve_retry_interval milliseconds have
2194 * elapsed, but that's pretty harmless.
2195 */
2197
2198 /*
2199 * Cleanup of tablesync replication origins.
2200 *
2201 * Any READY-state relations would already have dealt with clean-ups.
2202 *
2203 * Note that the state can't change because we have already stopped both
2204 * the apply and tablesync workers and they can't restart because of
2205 * exclusive lock on the subscription.
2206 */
2207 rstates = GetSubscriptionRelations(subid, true, false, true);
2208 foreach(lc, rstates)
2209 {
2211 Oid relid = rstate->relid;
2212
2213 /* Only cleanup resources of tablesync workers */
2214 if (!OidIsValid(relid))
2215 continue;
2216
2217 /*
2218 * Drop the tablesync's origin tracking if exists.
2219 *
2220 * It is possible that the origin is not yet created for tablesync
2221 * worker so passing missing_ok = true. This can happen for the states
2222 * before SUBREL_STATE_DATASYNC.
2223 */
2225 sizeof(originname));
2226 replorigin_drop_by_name(originname, true, false);
2227 }
2228
2229 /* Clean up dependencies */
2231
2232 /* Remove any associated relation synchronization states. */
2234
2235 /* Remove the origin tracking if exists. */
2237 replorigin_drop_by_name(originname, true, false);
2238
2239 /*
2240 * Tell the cumulative stats system that the subscription is getting
2241 * dropped.
2242 */
2244
2245 /*
2246 * If there is no slot associated with the subscription, we can finish
2247 * here.
2248 */
2249 if (!slotname && rstates == NIL)
2250 {
2251 table_close(rel, NoLock);
2252 return;
2253 }
2254
2255 /*
2256 * Try to acquire the connection necessary for dropping slots.
2257 *
2258 * Note: If the slotname is NONE/NULL then we allow the command to finish
2259 * and users need to manually cleanup the apply and tablesync worker slots
2260 * later.
2261 *
2262 * This has to be at the end because otherwise if there is an error while
2263 * doing the database operations we won't be able to rollback dropped
2264 * slot.
2265 */
2266 load_file("libpqwalreceiver", false);
2267
2268 wrconn = walrcv_connect(conninfo, true, true, must_use_password,
2269 subname, &err);
2270 if (wrconn == NULL)
2271 {
2272 if (!slotname)
2273 {
2274 /* be tidy */
2276 table_close(rel, NoLock);
2277 return;
2278 }
2279 else
2280 {
2281 ReportSlotConnectionError(rstates, subid, slotname, err);
2282 }
2283 }
2284
2285 PG_TRY();
2286 {
2287 foreach(lc, rstates)
2288 {
2290 Oid relid = rstate->relid;
2291
2292 /* Only cleanup resources of tablesync workers */
2293 if (!OidIsValid(relid))
2294 continue;
2295
2296 /*
2297 * Drop the tablesync slots associated with removed tables.
2298 *
2299 * For SYNCDONE/READY states, the tablesync slot is known to have
2300 * already been dropped by the tablesync worker.
2301 *
2302 * For other states, there is no certainty, maybe the slot does
2303 * not exist yet. Also, if we fail after removing some of the
2304 * slots, next time, it will again try to drop already dropped
2305 * slots and fail. For these reasons, we allow missing_ok = true
2306 * for the drop.
2307 */
2308 if (rstate->state != SUBREL_STATE_SYNCDONE)
2309 {
2310 char syncslotname[NAMEDATALEN] = {0};
2311
2313 sizeof(syncslotname));
2315 }
2316 }
2317
2319
2320 /*
2321 * If there is a slot associated with the subscription, then drop the
2322 * replication slot at the publisher.
2323 */
2324 if (slotname)
2325 ReplicationSlotDropAtPubNode(wrconn, slotname, false);
2326 }
2327 PG_FINALLY();
2328 {
2330 }
2331 PG_END_TRY();
2332
2333 table_close(rel, NoLock);
2334}
2335
2336/*
2337 * Drop the replication slot at the publisher node using the replication
2338 * connection.
2339 *
2340 * missing_ok - if true then only issue a LOG message if the slot doesn't
2341 * exist.
2342 */
2343void
2344ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
2345{
2346 StringInfoData cmd;
2347
2348 Assert(wrconn);
2349
2350 load_file("libpqwalreceiver", false);
2351
2352 initStringInfo(&cmd);
2353 appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
2354
2355 PG_TRY();
2356 {
2357 WalRcvExecResult *res;
2358
2359 res = walrcv_exec(wrconn, cmd.data, 0, NULL);
2360
2361 if (res->status == WALRCV_OK_COMMAND)
2362 {
2363 /* NOTICE. Success. */
2365 (errmsg("dropped replication slot \"%s\" on publisher",
2366 slotname)));
2367 }
2368 else if (res->status == WALRCV_ERROR &&
2369 missing_ok &&
2371 {
2372 /* LOG. Error, but missing_ok = true. */
2373 ereport(LOG,
2374 (errmsg("could not drop replication slot \"%s\" on publisher: %s",
2375 slotname, res->err)));
2376 }
2377 else
2378 {
2379 /* ERROR. */
2380 ereport(ERROR,
2382 errmsg("could not drop replication slot \"%s\" on publisher: %s",
2383 slotname, res->err)));
2384 }
2385
2387 }
2388 PG_FINALLY();
2389 {
2390 pfree(cmd.data);
2391 }
2392 PG_END_TRY();
2393}
2394
2395/*
2396 * Internal workhorse for changing a subscription owner
2397 */
2398static void
2400{
2403
2405
2406 if (form->subowner == newOwnerId)
2407 return;
2408
2411 NameStr(form->subname));
2412
2413 /*
2414 * Don't allow non-superuser modification of a subscription with
2415 * password_required=false.
2416 */
2417 if (!form->subpasswordrequired && !superuser())
2418 ereport(ERROR,
2420 errmsg("password_required=false is superuser-only"),
2421 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
2422
2423 /* Must be able to become new owner */
2425
2426 /*
2427 * current owner must have CREATE on database
2428 *
2429 * This is consistent with how ALTER SCHEMA ... OWNER TO works, but some
2430 * other object types behave differently (e.g. you can't give a table to a
2431 * user who lacks CREATE privileges on a schema).
2432 */
2435 if (aclresult != ACLCHECK_OK)
2438
2439 form->subowner = newOwnerId;
2440 CatalogTupleUpdate(rel, &tup->t_self, tup);
2441
2442 /* Update owner dependency reference */
2444 form->oid,
2445 newOwnerId);
2446
2448 form->oid, 0);
2449
2450 /* Wake up related background processes to handle this change quickly. */
2453}
2454
2455/*
2456 * Change subscription owner -- by name
2457 */
2460{
2461 Oid subid;
2462 HeapTuple tup;
2463 Relation rel;
2464 ObjectAddress address;
2466
2468
2471
2472 if (!HeapTupleIsValid(tup))
2473 ereport(ERROR,
2475 errmsg("subscription \"%s\" does not exist", name)));
2476
2478 subid = form->oid;
2479
2481
2483
2485
2487
2488 return address;
2489}
2490
2491/*
2492 * Change subscription owner -- by OID
2493 */
2494void
2496{
2497 HeapTuple tup;
2498 Relation rel;
2499
2501
2503
2504 if (!HeapTupleIsValid(tup))
2505 ereport(ERROR,
2507 errmsg("subscription with OID %u does not exist", subid)));
2508
2510
2512
2514}
2515
2516/*
2517 * Check and log a warning if the publisher has subscribed to the same table,
2518 * its partition ancestors (if it's a partition), or its partition children (if
2519 * it's a partitioned table), from some other publishers. This check is
2520 * required in the following scenarios:
2521 *
2522 * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2523 * statements with "copy_data = true" and "origin = none":
2524 * - Warn the user that data with an origin might have been copied.
2525 * - This check is skipped for tables already added, as incremental sync via
2526 * WAL allows origin tracking. The list of such tables is in
2527 * subrel_local_oids.
2528 *
2529 * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH PUBLICATION
2530 * statements with "retain_dead_tuples = true" and "origin = any", and for
2531 * ALTER SUBSCRIPTION statements that modify retain_dead_tuples or origin,
2532 * or when the publisher's status changes (e.g., due to a connection string
2533 * update):
2534 * - Warn the user that only conflict detection info for local changes on
2535 * the publisher is retained. Data from other origins may lack sufficient
2536 * details for reliable conflict detection.
2537 * - See comments atop worker.c for more details.
2538 */
2539static void
2541 bool copydata, bool retain_dead_tuples,
2542 char *origin, Oid *subrel_local_oids,
2543 int subrel_count, char *subname)
2544{
2545 WalRcvExecResult *res;
2546 StringInfoData cmd;
2547 TupleTableSlot *slot;
2548 Oid tableRow[1] = {TEXTOID};
2549 List *publist = NIL;
2550 int i;
2551 bool check_rdt;
2552 bool check_table_sync;
2553 bool origin_none = origin &&
2555
2556 /*
2557 * Enable retain_dead_tuples checks only when origin is set to 'any',
2558 * since with origin='none' only local changes are replicated to the
2559 * subscriber.
2560 */
2562
2563 /*
2564 * Enable table synchronization checks only when origin is 'none', to
2565 * ensure that data from other origins is not inadvertently copied.
2566 */
2568
2569 /* retain_dead_tuples and table sync checks occur separately */
2571
2572 /* Return if no checks are required */
2573 if (!check_rdt && !check_table_sync)
2574 return;
2575
2576 initStringInfo(&cmd);
2578 "SELECT DISTINCT P.pubname AS pubname\n"
2579 "FROM pg_publication P,\n"
2580 " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
2581 " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid OR"
2582 " GPT.relid IN (SELECT relid FROM pg_partition_ancestors(PS.srrelid) UNION"
2583 " SELECT relid FROM pg_partition_tree(PS.srrelid))),\n"
2584 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2585 "WHERE C.oid = GPT.relid AND P.pubname IN (");
2586 GetPublicationsStr(publications, &cmd, true);
2587 appendStringInfoString(&cmd, ")\n");
2588
2589 /*
2590 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2591 * subrel_local_oids contains the list of relation oids that are already
2592 * present on the subscriber. This check should be skipped for these
2593 * tables if checking for table sync scenario. However, when handling the
2594 * retain_dead_tuples scenario, ensure all tables are checked, as some
2595 * existing tables may now include changes from other origins due to newly
2596 * created subscriptions on the publisher.
2597 */
2598 if (check_table_sync)
2599 {
2600 for (i = 0; i < subrel_count; i++)
2601 {
2602 Oid relid = subrel_local_oids[i];
2603 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2604 char *tablename = get_rel_name(relid);
2605
2606 appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2607 schemaname, tablename);
2608 }
2609 }
2610
2611 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2612 pfree(cmd.data);
2613
2614 if (res->status != WALRCV_OK_TUPLES)
2615 ereport(ERROR,
2617 errmsg("could not receive list of replicated tables from the publisher: %s",
2618 res->err)));
2619
2620 /* Process publications. */
2622 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2623 {
2624 char *pubname;
2625 bool isnull;
2626
2627 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2628 Assert(!isnull);
2629
2630 ExecClearTuple(slot);
2632 }
2633
2634 /*
2635 * Log a warning if the publisher has subscribed to the same table from
2636 * some other publisher. We cannot know the origin of data during the
2637 * initial sync. Data origins can be found only from the WAL by looking at
2638 * the origin id.
2639 *
2640 * XXX: For simplicity, we don't check whether the table has any data or
2641 * not. If the table doesn't have any data then we don't need to
2642 * distinguish between data having origin and data not having origin so we
2643 * can avoid logging a warning for table sync scenario.
2644 */
2645 if (publist)
2646 {
2648
2649 /* Prepare the list of publication(s) for warning message. */
2652
2653 if (check_table_sync)
2656 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2657 subname),
2658 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2659 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2661 errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
2662 else
2665 errmsg("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins",
2666 subname),
2667 errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
2668 "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
2670 errhint("Consider using origin = NONE or disabling retain_dead_tuples."));
2671 }
2672
2674
2676}
2677
2678/*
2679 * This function is similar to check_publications_origin_tables and serves
2680 * same purpose for sequences.
2681 */
2682static void
2684 bool copydata, char *origin,
2686 char *subname)
2687{
2688 WalRcvExecResult *res;
2689 StringInfoData cmd;
2690 TupleTableSlot *slot;
2691 Oid tableRow[1] = {TEXTOID};
2692 List *publist = NIL;
2693
2694 /*
2695 * Enable sequence synchronization checks only when origin is 'none' , to
2696 * ensure that sequence data from other origins is not inadvertently
2697 * copied. This check is necessary if the publisher is running PG19 or
2698 * later, where logical replication sequence synchronization is supported.
2699 */
2700 if (!copydata || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0 ||
2701 walrcv_server_version(wrconn) < 190000)
2702 return;
2703
2704 initStringInfo(&cmd);
2706 "SELECT DISTINCT P.pubname AS pubname\n"
2707 "FROM pg_publication P,\n"
2708 " LATERAL pg_get_publication_sequences(P.pubname) GPS\n"
2709 " JOIN pg_subscription_rel PS ON (GPS.relid = PS.srrelid),\n"
2710 " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
2711 "WHERE C.oid = GPS.relid AND P.pubname IN (");
2712
2713 GetPublicationsStr(publications, &cmd, true);
2714 appendStringInfoString(&cmd, ")\n");
2715
2716 /*
2717 * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
2718 * subrel_local_oids contains the list of relations that are already
2719 * present on the subscriber. This check should be skipped as these will
2720 * not be re-synced.
2721 */
2722 for (int i = 0; i < subrel_count; i++)
2723 {
2724 Oid relid = subrel_local_oids[i];
2725 char *schemaname = get_namespace_name(get_rel_namespace(relid));
2726 char *seqname = get_rel_name(relid);
2727
2728 appendStringInfo(&cmd,
2729 "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
2730 schemaname, seqname);
2731 }
2732
2733 res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
2734 pfree(cmd.data);
2735
2736 if (res->status != WALRCV_OK_TUPLES)
2737 ereport(ERROR,
2739 errmsg("could not receive list of replicated sequences from the publisher: %s",
2740 res->err)));
2741
2742 /* Process publications. */
2744 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2745 {
2746 char *pubname;
2747 bool isnull;
2748
2749 pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
2750 Assert(!isnull);
2751
2752 ExecClearTuple(slot);
2754 }
2755
2756 /*
2757 * Log a warning if the publisher has subscribed to the same sequence from
2758 * some other publisher. We cannot know the origin of sequences data
2759 * during the initial sync.
2760 */
2761 if (publist)
2762 {
2764
2765 /* Prepare the list of publication(s) for warning message. */
2768
2771 errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
2772 subname),
2773 errdetail_plural("The subscription subscribes to a publication (%s) that contains sequences that are written to by other subscriptions.",
2774 "The subscription subscribes to publications (%s) that contain sequences that are written to by other subscriptions.",
2776 errhint("Verify that initial data copied from the publisher sequences did not come from other origins."));
2777 }
2778
2780
2782}
2783
2784/*
2785 * Determine whether the retain_dead_tuples can be enabled based on the
2786 * publisher's status.
2787 *
2788 * This option is disallowed if the publisher is running a version earlier
2789 * than the PG19, or if the publisher is in recovery (i.e., it is a standby
2790 * server).
2791 *
2792 * See comments atop worker.c for a detailed explanation.
2793 */
2794static void
2796{
2797 WalRcvExecResult *res;
2798 Oid RecoveryRow[1] = {BOOLOID};
2799 TupleTableSlot *slot;
2800 bool isnull;
2801 bool remote_in_recovery;
2802
2803 if (walrcv_server_version(wrconn) < 190000)
2804 ereport(ERROR,
2806 errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
2807
2808 res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
2809
2810 if (res->status != WALRCV_OK_TUPLES)
2811 ereport(ERROR,
2813 errmsg("could not obtain recovery progress from the publisher: %s",
2814 res->err)));
2815
2817 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
2818 elog(ERROR, "failed to fetch tuple for the recovery progress");
2819
2820 remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
2821
2823 ereport(ERROR,
2825 errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
2826
2828
2830}
2831
2832/*
2833 * Check if the subscriber's configuration is adequate to enable the
2834 * retain_dead_tuples option.
2835 *
2836 * Issue an ERROR if the wal_level does not support the use of replication
2837 * slots when check_guc is set to true.
2838 *
2839 * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
2840 * set to true. This is only to highlight the importance of enabling
2841 * track_commit_timestamp instead of catching all the misconfigurations, as
2842 * this setting can be adjusted after subscription creation. Without it, the
2843 * apply worker will simply skip conflict detection.
2844 *
2845 * Issue a WARNING or NOTICE if the subscription is disabled and the retention
2846 * is active. Do not raise an ERROR since users can only modify
2847 * retain_dead_tuples for disabled subscriptions. And as long as the
2848 * subscription is enabled promptly, it will not pose issues.
2849 *
2850 * Issue a NOTICE to inform users that max_retention_duration is
2851 * ineffective when retain_dead_tuples is disabled for a subscription. An ERROR
2852 * is not issued because setting max_retention_duration causes no harm,
2853 * even when it is ineffective.
2854 */
2855void
2859 bool max_retention_set)
2860{
2863
2865 {
2867 ereport(ERROR,
2869 errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
2870 errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
2871
2875 errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
2876 errhint("Consider setting \"%s\" to true.",
2877 "track_commit_timestamp"));
2878
2882 errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
2884 ? errhint("Consider setting %s to false.",
2885 "retain_dead_tuples") : 0);
2886 }
2887 else if (max_retention_set)
2888 {
2891 errmsg("max_retention_duration is ineffective when retain_dead_tuples is disabled"));
2892 }
2893}
2894
2895/*
2896 * Return true iff 'rv' is a member of the list.
2897 */
2898static bool
2900{
2902 {
2903 if (equal(relinfo->rv, rv))
2904 return true;
2905 }
2906
2907 return false;
2908}
2909
2910/*
2911 * Get the list of tables and sequences which belong to specified publications
2912 * on the publisher connection.
2913 *
2914 * Note that we don't support the case where the column list is different for
2915 * the same table in different publications to avoid sending unwanted column
2916 * information for some of the rows. This can happen when both the column
2917 * list and row filter are specified for different publications.
2918 */
2919static List *
2921{
2922 WalRcvExecResult *res;
2923 StringInfoData cmd;
2924 TupleTableSlot *slot;
2928 bool check_columnlist = (server_version >= 150000);
2929 int column_count = check_columnlist ? 4 : 3;
2930 StringInfoData pub_names;
2931
2932 initStringInfo(&cmd);
2933 initStringInfo(&pub_names);
2934
2935 /* Build the pub_names comma-separated string. */
2936 GetPublicationsStr(publications, &pub_names, true);
2937
2938 /* Get the list of relations from the publisher */
2939 if (server_version >= 160000)
2940 {
2942
2943 /*
2944 * From version 16, we allowed passing multiple publications to the
2945 * function pg_get_publication_tables. This helped to filter out the
2946 * partition table whose ancestor is also published in this
2947 * publication array.
2948 *
2949 * Join pg_get_publication_tables with pg_publication to exclude
2950 * non-existing publications.
2951 *
2952 * Note that attrs are always stored in sorted order so we don't need
2953 * to worry if different publications have specified them in a
2954 * different order. See pub_collist_validate.
2955 */
2956 appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, c.relkind, gpt.attrs\n"
2957 " FROM pg_class c\n"
2958 " JOIN pg_namespace n ON n.oid = c.relnamespace\n"
2959 " JOIN ( SELECT (pg_get_publication_tables(VARIADIC array_agg(pubname::text))).*\n"
2960 " FROM pg_publication\n"
2961 " WHERE pubname IN ( %s )) AS gpt\n"
2962 " ON gpt.relid = c.oid\n",
2963 pub_names.data);
2964
2965 /* From version 19, inclusion of sequences in the target is supported */
2966 if (server_version >= 190000)
2967 appendStringInfo(&cmd,
2968 "UNION ALL\n"
2969 " SELECT DISTINCT s.schemaname, s.sequencename, " CppAsString2(RELKIND_SEQUENCE) "::\"char\" AS relkind, NULL::int2vector AS attrs\n"
2970 " FROM pg_catalog.pg_publication_sequences s\n"
2971 " WHERE s.pubname IN ( %s )",
2972 pub_names.data);
2973 }
2974 else
2975 {
2977 appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename, " CppAsString2(RELKIND_RELATION) "::\"char\" AS relkind \n");
2978
2979 /* Get column lists for each relation if the publisher supports it */
2980 if (check_columnlist)
2981 appendStringInfoString(&cmd, ", t.attnames\n");
2982
2983 appendStringInfo(&cmd, "FROM pg_catalog.pg_publication_tables t\n"
2984 " WHERE t.pubname IN ( %s )",
2985 pub_names.data);
2986 }
2987
2988 pfree(pub_names.data);
2989
2991 pfree(cmd.data);
2992
2993 if (res->status != WALRCV_OK_TUPLES)
2994 ereport(ERROR,
2996 errmsg("could not receive list of replicated tables from the publisher: %s",
2997 res->err)));
2998
2999 /* Process tables. */
3001 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3002 {
3003 char *nspname;
3004 char *relname;
3005 bool isnull;
3006 char relkind;
3008
3009 nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
3010 Assert(!isnull);
3011 relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3012 Assert(!isnull);
3013 relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
3014 Assert(!isnull);
3015
3016 relinfo->rv = makeRangeVar(nspname, relname, -1);
3017 relinfo->relkind = relkind;
3018
3019 if (relkind != RELKIND_SEQUENCE &&
3022 ereport(ERROR,
3024 errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
3025 nspname, relname));
3026 else
3028
3029 ExecClearTuple(slot);
3030 }
3032
3034
3035 return relationlist;
3036}
3037
3038/*
3039 * This is to report the connection failure while dropping replication slots.
3040 * Here, we report the WARNING for all tablesync slots so that user can drop
3041 * them manually, if required.
3042 */
3043static void
3044ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
3045{
3046 ListCell *lc;
3047
3048 foreach(lc, rstates)
3049 {
3051 Oid relid = rstate->relid;
3052
3053 /* Only cleanup resources of tablesync workers */
3054 if (!OidIsValid(relid))
3055 continue;
3056
3057 /*
3058 * Caller needs to ensure that relstate doesn't change underneath us.
3059 * See DropSubscription where we get the relstates.
3060 */
3061 if (rstate->state != SUBREL_STATE_SYNCDONE)
3062 {
3063 char syncslotname[NAMEDATALEN] = {0};
3064
3066 sizeof(syncslotname));
3067 elog(WARNING, "could not drop tablesync replication slot \"%s\"",
3068 syncslotname);
3069 }
3070 }
3071
3072 ereport(ERROR,
3074 errmsg("could not connect to publisher when attempting to drop replication slot \"%s\": %s",
3075 slotname, err),
3076 /* translator: %s is an SQL ALTER command */
3077 errhint("Use %s to disable the subscription, and then use %s to disassociate it from the slot.",
3078 "ALTER SUBSCRIPTION ... DISABLE",
3079 "ALTER SUBSCRIPTION ... SET (slot_name = NONE)")));
3080}
3081
3082/*
3083 * Check for duplicates in the given list of publications and error out if
3084 * found one. Add publications to datums as text datums, if datums is not
3085 * NULL.
3086 */
3087static void
3089{
3090 ListCell *cell;
3091 int j = 0;
3092
3093 foreach(cell, publist)
3094 {
3095 char *name = strVal(lfirst(cell));
3096 ListCell *pcell;
3097
3098 foreach(pcell, publist)
3099 {
3100 char *pname = strVal(lfirst(pcell));
3101
3102 if (pcell == cell)
3103 break;
3104
3105 if (strcmp(name, pname) == 0)
3106 ereport(ERROR,
3108 errmsg("publication name \"%s\" used more than once",
3109 pname)));
3110 }
3111
3112 if (datums)
3113 datums[j++] = CStringGetTextDatum(name);
3114 }
3115}
3116
3117/*
3118 * Merge current subscription's publications and user-specified publications
3119 * from ADD/DROP PUBLICATIONS.
3120 *
3121 * If addpub is true, we will add the list of publications into oldpublist.
3122 * Otherwise, we will delete the list of publications from oldpublist. The
3123 * returned list is a copy, oldpublist itself is not changed.
3124 *
3125 * subname is the subscription name, for error messages.
3126 */
3127static List *
3129{
3130 ListCell *lc;
3131
3133
3135
3136 foreach(lc, newpublist)
3137 {
3138 char *name = strVal(lfirst(lc));
3139 ListCell *lc2;
3140 bool found = false;
3141
3142 foreach(lc2, oldpublist)
3143 {
3144 char *pubname = strVal(lfirst(lc2));
3145
3146 if (strcmp(name, pubname) == 0)
3147 {
3148 found = true;
3149 if (addpub)
3150 ereport(ERROR,
3152 errmsg("publication \"%s\" is already in subscription \"%s\"",
3153 name, subname)));
3154 else
3156
3157 break;
3158 }
3159 }
3160
3161 if (addpub && !found)
3163 else if (!addpub && !found)
3164 ereport(ERROR,
3166 errmsg("publication \"%s\" is not in subscription \"%s\"",
3167 name, subname)));
3168 }
3169
3170 /*
3171 * XXX Probably no strong reason for this, but for now it's to make ALTER
3172 * SUBSCRIPTION ... DROP PUBLICATION consistent with SET PUBLICATION.
3173 */
3174 if (!oldpublist)
3175 ereport(ERROR,
3177 errmsg("cannot drop all the publications from a subscription")));
3178
3179 return oldpublist;
3180}
3181
3182/*
3183 * Extract the streaming mode value from a DefElem. This is like
3184 * defGetBoolean() but also accepts the special value of "parallel".
3185 */
3186char
3188{
3189 /*
3190 * If no parameter value given, assume "true" is meant.
3191 */
3192 if (!def->arg)
3193 return LOGICALREP_STREAM_ON;
3194
3195 /*
3196 * Allow 0, 1, "false", "true", "off", "on" or "parallel".
3197 */
3198 switch (nodeTag(def->arg))
3199 {
3200 case T_Integer:
3201 switch (intVal(def->arg))
3202 {
3203 case 0:
3204 return LOGICALREP_STREAM_OFF;
3205 case 1:
3206 return LOGICALREP_STREAM_ON;
3207 default:
3208 /* otherwise, error out below */
3209 break;
3210 }
3211 break;
3212 default:
3213 {
3214 char *sval = defGetString(def);
3215
3216 /*
3217 * The set of strings accepted here should match up with the
3218 * grammar's opt_boolean_or_string production.
3219 */
3220 if (pg_strcasecmp(sval, "false") == 0 ||
3221 pg_strcasecmp(sval, "off") == 0)
3222 return LOGICALREP_STREAM_OFF;
3223 if (pg_strcasecmp(sval, "true") == 0 ||
3224 pg_strcasecmp(sval, "on") == 0)
3225 return LOGICALREP_STREAM_ON;
3226 if (pg_strcasecmp(sval, "parallel") == 0)
3228 }
3229 break;
3230 }
3231
3232 ereport(ERROR,
3234 errmsg("%s requires a Boolean value or \"parallel\"",
3235 def->defname)));
3236 return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
3237}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5286
void check_can_set_role(Oid member, Oid role)
Definition acl.c:5343
AclResult
Definition acl.h:182
@ ACLCHECK_OK
Definition acl.h:183
@ ACLCHECK_NOT_OWNER
Definition acl.h:185
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
Definition aclchk.c:2654
AclResult object_aclcheck(Oid classid, Oid objectid, Oid roleid, AclMode mode)
Definition aclchk.c:3854
bool object_ownercheck(Oid classid, Oid objectid, Oid roleid)
Definition aclchk.c:4108
ArrayType * construct_array_builtin(Datum *elems, int nelems, Oid elmtype)
void LogicalRepWorkersWakeupAtCommit(Oid subid)
Definition worker.c:6296
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
Definition worker.c:644
static Datum values[MAXATTR]
Definition bootstrap.c:147
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define TextDatumGetCString(d)
Definition builtins.h:99
#define NameStr(name)
Definition c.h:777
#define Assert(condition)
Definition c.h:885
uint32 bits32
Definition c.h:567
#define CppAsString2(x)
Definition c.h:440
int32_t int32
Definition c.h:554
#define OidIsValid(objectId)
Definition c.h:800
Oid GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn)
Definition catalog.c:448
bool track_commit_timestamp
Definition commit_ts.c:109
int32 defGetInt32(DefElem *def)
Definition define.c:148
char * defGetString(DefElem *def)
Definition define.c:34
bool defGetBoolean(DefElem *def)
Definition define.c:93
void errorConflictingDefElem(DefElem *defel, ParseState *pstate)
Definition define.c:370
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
int errcode(int sqlerrcode)
Definition elog.c:874
int errmsg(const char *fmt,...)
Definition elog.c:1093
#define LOG
Definition elog.h:31
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:372
#define WARNING
Definition elog.h:36
int int int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
#define PG_END_TRY(...)
Definition elog.h:397
#define DEBUG1
Definition elog.h:30
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define NOTICE
Definition elog.h:35
#define PG_FINALLY(...)
Definition elog.h:389
#define ereport(elevel,...)
Definition elog.h:150
int errdetail_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...) pg_attribute_printf(1
bool equal(const void *a, const void *b)
Definition equalfuncs.c:223
void err(int eval, const char *fmt,...)
Definition err.c:43
void EventTriggerSQLDropAddObject(const ObjectAddress *object, bool original, bool normal)
void CheckSubscriptionRelkind(char localrelkind, char remoterelkind, const char *nspname, const char *relname)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Definition execTuples.c:86
#define palloc_object(type)
Definition fe_memutils.h:74
#define palloc_array(type, count)
Definition fe_memutils.h:76
#define DirectFunctionCall1(func, arg1)
Definition fmgr.h:684
Oid MyDatabaseId
Definition globals.c:94
bool parse_int(const char *value, int *result, int flags, const char **hintmsg)
Definition guc.c:2743
int set_config_option(const char *name, const char *value, GucContext context, GucSource source, GucAction action, bool changeVal, int elevel, bool is_reload)
Definition guc.c:3216
@ GUC_ACTION_SET
Definition guc.h:203
@ PGC_S_TEST
Definition guc.h:125
@ PGC_BACKEND
Definition guc.h:77
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition heaptuple.c:1210
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
#define stmt
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
Definition indexing.c:313
void CatalogTupleInsert(Relation heapRel, HeapTuple tup)
Definition indexing.c:233
void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid)
Definition indexing.c:365
long val
Definition informix.c:689
int j
Definition isn.c:78
int i
Definition isn.c:77
List * logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
Definition launcher.c:293
void ApplyLauncherWakeupAtCommit(void)
Definition launcher.c:1184
void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
Definition launcher.c:652
void ApplyLauncherForgetWorkerStartTime(Oid subid)
Definition launcher.c:1154
List * lappend(List *list, void *datum)
Definition list.c:339
List * list_delete(List *list, void *datum)
Definition list.c:853
List * list_append_unique(List *list, void *datum)
Definition list.c:1343
List * list_copy(const List *oldlist)
Definition list.c:1573
void list_free(List *list)
Definition list.c:1546
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
Definition lmgr.c:1088
#define NoLock
Definition lockdefs.h:34
#define AccessExclusiveLock
Definition lockdefs.h:43
#define AccessShareLock
Definition lockdefs.h:36
#define RowExclusiveLock
Definition lockdefs.h:38
char * get_rel_name(Oid relid)
Definition lsyscache.c:2078
char * get_database_name(Oid dbid)
Definition lsyscache.c:1242
char get_rel_relkind(Oid relid)
Definition lsyscache.c:2153
Oid get_rel_namespace(Oid relid)
Definition lsyscache.c:2102
char * get_namespace_name(Oid nspid)
Definition lsyscache.c:3518
RangeVar * makeRangeVar(char *schemaname, char *relname, int location)
Definition makefuncs.c:473
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
Oid GetUserId(void)
Definition miscinit.c:469
Datum namein(PG_FUNCTION_ARGS)
Definition name.c:48
#define RangeVarGetRelid(relation, lockmode, missing_ok)
Definition namespace.h:98
#define nodeTag(nodeptr)
Definition nodes.h:139
#define InvokeObjectPostCreateHook(classId, objectId, subId)
#define InvokeObjectPostAlterHook(classId, objectId, subId)
#define InvokeObjectDropHook(classId, objectId, subId)
#define ObjectAddressSet(addr, class_id, object_id)
int oid_cmp(const void *p1, const void *p2)
Definition oid.c:287
ReplOriginId replorigin_create(const char *roname)
Definition origin.c:262
ReplOriginId replorigin_by_name(const char *roname, bool missing_ok)
Definition origin.c:231
XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush)
Definition origin.c:1047
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
Definition origin.c:447
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
@ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION
@ ALTER_SUBSCRIPTION_ENABLED
@ ALTER_SUBSCRIPTION_DROP_PUBLICATION
@ ALTER_SUBSCRIPTION_SET_PUBLICATION
@ ALTER_SUBSCRIPTION_REFRESH_SEQUENCES
@ ALTER_SUBSCRIPTION_SKIP
@ ALTER_SUBSCRIPTION_OPTIONS
@ ALTER_SUBSCRIPTION_CONNECTION
@ ALTER_SUBSCRIPTION_ADD_PUBLICATION
@ OBJECT_DATABASE
@ OBJECT_SUBSCRIPTION
#define ACL_CREATE
Definition parsenodes.h:85
static AmcheckOptions opts
Definition pg_amcheck.c:112
NameData relname
Definition pg_class.h:40
#define NAMEDATALEN
static int server_version
Definition pg_dumpall.c:121
#define lfirst(lc)
Definition pg_list.h:172
static int list_length(const List *l)
Definition pg_list.h:152
#define NIL
Definition pg_list.h:68
#define foreach_delete_current(lst, var_or_cell)
Definition pg_list.h:391
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
Datum pg_lsn_in(PG_FUNCTION_ARGS)
Definition pg_lsn.c:64
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static XLogRecPtr DatumGetLSN(Datum X)
Definition pg_lsn.h:25
void changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
void deleteSharedDependencyRecordsFor(Oid classId, Oid objectId, int32 objectSubId)
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
void RemoveSubscriptionRel(Oid subid, Oid relid)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock)
Subscription * GetSubscription(Oid subid, bool missing_ok)
List * GetSubscriptionRelations(Oid subid, bool tables, bool sequences, bool not_ready)
NameData subname
END_CATALOG_STRUCT typedef FormData_pg_subscription * Form_pg_subscription
void pgstat_drop_subscription(Oid subid)
void pgstat_create_subscription(Oid subid)
int pg_strcasecmp(const char *s1, const char *s2)
#define qsort(a, b, c, d)
Definition port.h:495
static bool DatumGetBool(Datum X)
Definition postgres.h:100
static Datum PointerGetDatum(const void *X)
Definition postgres.h:352
static Name DatumGetName(Datum X)
Definition postgres.h:390
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:262
uint64_t Datum
Definition postgres.h:70
static char DatumGetChar(Datum X)
Definition postgres.h:122
static Datum CStringGetDatum(const char *X)
Definition postgres.h:380
static Datum Int32GetDatum(int32 X)
Definition postgres.h:222
static Datum CharGetDatum(char X)
Definition postgres.h:132
#define InvalidOid
unsigned int Oid
static int fb(int x)
#define RelationGetDescr(relation)
Definition rel.h:540
const char * quote_identifier(const char *ident)
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition slot.c:267
#define ERRCODE_DUPLICATE_OBJECT
Definition streamutil.c:30
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void appendStringInfoChar(StringInfo str, char ch)
Definition stringinfo.c:242
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
char * defname
Definition parsenodes.h:844
Node * arg
Definition parsenodes.h:845
Definition pg_list.h:54
LogicalRepWorkerType type
char * relname
Definition primnodes.h:84
char * schemaname
Definition primnodes.h:81
bits32 specified_opts
bool retaindeadtuples
char * wal_receiver_timeout
char * synchronous_commit
int32 maxretention
char * slot_name
XLogRecPtr lsn
bool passwordrequired
Tuplestorestate * tuplestore
TupleDesc tupledesc
WalRcvExecStatus status
void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_STREAMING
char defGetStreamingMode(DefElem *def)
#define SUBOPT_CREATE_SLOT
#define SUBOPT_PASSWORD_REQUIRED
ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel)
#define SUBOPT_SYNCHRONOUS_COMMIT
#define SUBOPT_ENABLED
static void check_duplicates_in_publist(List *publist, Datum *datums)
static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel)
#define SUBOPT_RETAIN_DEAD_TUPLES
#define SUBOPT_ORIGIN
static Datum publicationListToArray(List *publist)
#define SUBOPT_FAILOVER
static void check_publications_origin_sequences(WalReceiverConn *wrconn, List *publications, bool copydata, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static void parse_subscription_options(ParseState *pstate, List *stmt_options, bits32 supported_opts, SubOpts *opts)
static void check_publications(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_RUN_AS_OWNER
#define SUBOPT_SLOT_NAME
static List * fetch_relation_list(WalReceiverConn *wrconn, List *publications)
#define SUBOPT_COPY_DATA
#define SUBOPT_TWOPHASE_COMMIT
static void AlterSubscription_refresh(Subscription *sub, bool copy_data, List *validate_publications)
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err)
#define SUBOPT_DISABLE_ON_ERR
void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, int elevel_for_sub_disabled, bool retain_dead_tuples, bool retention_active, bool max_retention_set)
static void AlterSubscription_refresh_seq(Subscription *sub)
static void AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId)
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
#define SUBOPT_LSN
#define SUBOPT_MAX_RETENTION_DURATION
static List * merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname)
#define SUBOPT_WAL_RECEIVER_TIMEOUT
static void check_publications_origin_tables(WalReceiverConn *wrconn, List *publications, bool copydata, bool retain_dead_tuples, char *origin, Oid *subrel_local_oids, int subrel_count, char *subname)
static bool list_member_rangevar(const List *list, RangeVar *rv)
#define SUBOPT_BINARY
#define IsSet(val, bits)
#define SUBOPT_REFRESH
#define SUBOPT_CONNECT
static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel)
bool superuser_arg(Oid roleid)
Definition superuser.c:57
bool superuser(void)
Definition superuser.c:47
void ReleaseSysCache(HeapTuple tuple)
Definition syscache.c:264
HeapTuple SearchSysCache2(SysCacheIdentifier cacheId, Datum key1, Datum key2)
Definition syscache.c:230
Datum SysCacheGetAttrNotNull(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber)
Definition syscache.c:625
Datum SysCacheGetAttr(SysCacheIdentifier cacheId, HeapTuple tup, AttrNumber attributeNumber, bool *isNull)
Definition syscache.c:595
#define SearchSysCacheCopy1(cacheId, key1)
Definition syscache.h:91
#define SearchSysCacheCopy2(cacheId, key1, key2)
Definition syscache.h:93
#define GetSysCacheOid2(cacheId, oidcol, key1, key2)
Definition syscache.h:111
void table_close(Relation relation, LOCKMODE lockmode)
Definition table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition table.c:40
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
Definition tablesync.c:1202
void UpdateTwoPhaseState(Oid suboid, char new_state)
Definition tablesync.c:1647
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition tuptable.h:398
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
Definition tuptable.h:457
bool LookupGXactBySubid(Oid subid)
Definition twophase.c:2797
String * makeString(char *str)
Definition value.c:63
#define intVal(v)
Definition value.h:79
#define strVal(v)
Definition value.h:82
const char * name
static WalReceiverConn * wrconn
Definition walreceiver.c:94
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
@ WALRCV_OK_COMMAND
@ WALRCV_ERROR
@ WALRCV_OK_TUPLES
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_alter_slot(conn, slotname, failover, two_phase)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23
@ WORKERTYPE_TABLESYNC
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3669
int wal_level
Definition xlog.c:134
@ WAL_LEVEL_REPLICA
Definition xlog.h:76
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint16 ReplOriginId
Definition xlogdefs.h:69
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28