PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_createsubscriber.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_createsubscriber.c
4 * Create a new logical replica from a standby server
5 *
6 * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_createsubscriber.c
10 *
11 *-------------------------------------------------------------------------
12 */
13
14#include "postgres_fe.h"
15
16#include <sys/stat.h>
17#include <sys/time.h>
18#include <sys/wait.h>
19#include <time.h>
20
21#include "common/connect.h"
23#include "common/file_utils.h"
24#include "common/logging.h"
25#include "common/pg_prng.h"
27#include "datatype/timestamp.h"
31#include "fe_utils/version.h"
32#include "getopt_long.h"
33
34#define DEFAULT_SUB_PORT "50432"
35#define OBJECTTYPE_PUBLICATIONS 0x0001
36
37/*
38 * Configuration files for recovery parameters.
39 *
40 * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
41 * the server through an include_if_exists in postgresql.auto.conf.
42 *
43 * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
44 * so as the recovery parameters set by this tool never take effect on node
45 * restart. The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
46 * debugging.
47 */
48#define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
49#define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
50#define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
51
52/* Command-line options */
54{
55 char *config_file; /* configuration file */
56 char *pub_conninfo_str; /* publisher connection string */
57 char *socket_dir; /* directory for Unix-domain socket, if any */
58 char *sub_port; /* subscriber port number */
59 const char *sub_username; /* subscriber username */
60 bool two_phase; /* enable-two-phase option */
61 SimpleStringList database_names; /* list of database names */
62 SimpleStringList pub_names; /* list of publication names */
63 SimpleStringList sub_names; /* list of subscription names */
64 SimpleStringList replslot_names; /* list of replication slot names */
65 int recovery_timeout; /* stop recovery after this time */
66 bool all_dbs; /* all option */
67 SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
68};
69
70/* per-database publication/subscription info */
72{
73 char *dbname; /* database name */
74 char *pubconninfo; /* publisher connection string */
75 char *subconninfo; /* subscriber connection string */
76 char *pubname; /* publication name */
77 char *subname; /* subscription name */
78 char *replslotname; /* replication slot name */
79
80 bool made_replslot; /* replication slot was created */
81 bool made_publication; /* publication was created */
82};
83
84/*
85 * Information shared across all the databases (or publications and
86 * subscriptions).
87 */
89{
91 bool two_phase; /* enable-two-phase option */
92 bits32 objecttypes_to_clean; /* flags indicating which object types
93 * to clean up on subscriber */
94};
95
96static void cleanup_objects_atexit(void);
97static void usage(void);
98static char *get_base_conninfo(const char *conninfo, char **dbname);
99static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
100static char *get_exec_path(const char *argv0, const char *progname);
101static void check_data_directory(const char *datadir);
102static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
103static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
104 const char *pub_base_conninfo,
105 const char *sub_base_conninfo);
106static PGconn *connect_database(const char *conninfo, bool exit_on_error);
107static void disconnect_database(PGconn *conn, bool exit_on_error);
108static uint64 get_primary_sysid(const char *conninfo);
109static uint64 get_standby_sysid(const char *datadir);
110static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
111static bool server_is_in_recovery(PGconn *conn);
112static char *generate_object_name(PGconn *conn);
113static void check_publisher(const struct LogicalRepInfo *dbinfo);
114static char *setup_publisher(struct LogicalRepInfo *dbinfo);
115static void check_subscriber(const struct LogicalRepInfo *dbinfo);
116static void setup_subscriber(struct LogicalRepInfo *dbinfo,
117 const char *consistent_lsn);
118static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
119 const char *lsn);
120static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
121 const char *slotname);
122static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
124 struct LogicalRepInfo *dbinfo);
125static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
126 const char *slot_name);
127static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
128static void start_standby_server(const struct CreateSubscriberOptions *opt,
131static void stop_standby_server(const char *datadir);
132static void wait_for_end_recovery(const char *conninfo,
133 const struct CreateSubscriberOptions *opt);
134static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
135static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
136static void drop_publication(PGconn *conn, const char *pubname,
137 const char *dbname, bool *made_publication);
138static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
139static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
140static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
141 const char *lsn);
142static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
144 const struct LogicalRepInfo *dbinfo);
145static void drop_existing_subscription(PGconn *conn, const char *subname,
146 const char *dbname);
148 bool dbnamespecified);
149static void report_createsub_log(enum pg_log_level, enum pg_log_part,
150 const char *pg_restrict fmt,...)
154
155#define WAIT_INTERVAL 1 /* 1 second */
156
157static const char *progname;
158
159static char *primary_slot_name = NULL;
160static bool dry_run = false;
161
162static bool success = false;
163
165static int num_dbs = 0; /* number of specified databases */
166static int num_pubs = 0; /* number of specified publications */
167static int num_subs = 0; /* number of specified subscriptions */
168static int num_replslots = 0; /* number of specified replication slots */
169
171
172static char *pg_ctl_path = NULL;
173static char *pg_resetwal_path = NULL;
174
175/* standby / subscriber data directory */
176static char *subscriber_dir = NULL;
177
178static bool recovery_ended = false;
179static bool standby_running = false;
180static bool recovery_params_set = false;
181
182/*
183 * Report a message with a given log level
184 */
185static void
187 const char *pg_restrict fmt,...)
188{
189 va_list args;
190
191 va_start(args, fmt);
192
193 pg_log_generic_v(level, part, fmt, args);
194
195 va_end(args);
196}
197
198/*
199 * Report a fatal error and exit
200 */
201static void
203{
204 va_list args;
205
206 va_start(args, fmt);
207
209
210 va_end(args);
211
212 exit(1);
213}
214
215/*
216 * Clean up objects created by pg_createsubscriber.
217 *
218 * Publications and replication slots are created on the primary. Depending
219 * on the step where it failed, already-created objects should be removed if
220 * possible (sometimes this won't work due to a connection issue).
221 * There is no cleanup on the target server *after* its promotion, because any
222 * failure at this point means recreating the physical replica and starting
223 * again.
224 *
225 * The recovery configuration is always removed, by renaming the included
226 * configuration file out of the way.
227 */
228static void
230{
231 /* Rename the included configuration file, if necessary. */
233 {
236
241
243 {
244 /* durable_rename() has already logged something. */
246 "A manual removal of the recovery parameters may be required.");
247 }
248 }
249
250 if (success)
251 return;
252
253 /*
254 * If the server is promoted, there is no way to use the current setup
255 * again. Warn the user that a new replication setup should be done before
256 * trying again.
257 */
258 if (recovery_ended)
259 {
261 "failed after the end of recovery");
263 "The target server cannot be used as a physical replica anymore. "
264 "You must recreate the physical replica before continuing.");
265 }
266
267 for (int i = 0; i < num_dbs; i++)
268 {
269 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
270
271 if (dbinfo->made_publication || dbinfo->made_replslot)
272 {
273 PGconn *conn;
274
275 conn = connect_database(dbinfo->pubconninfo, false);
276 if (conn != NULL)
277 {
278 if (dbinfo->made_publication)
279 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
280 &dbinfo->made_publication);
281 if (dbinfo->made_replslot)
282 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
284 }
285 else
286 {
287 /*
288 * If a connection could not be established, inform the user
289 * that some objects were left on primary and should be
290 * removed before trying again.
291 */
292 if (dbinfo->made_publication)
293 {
295 "publication \"%s\" created in database \"%s\" on primary was left behind",
296 dbinfo->pubname,
297 dbinfo->dbname);
299 "Drop this publication before trying again.");
300 }
301 if (dbinfo->made_replslot)
302 {
304 "replication slot \"%s\" created in database \"%s\" on primary was left behind",
305 dbinfo->replslotname,
306 dbinfo->dbname);
308 "Drop this replication slot soon to avoid retention of WAL files.");
309 }
310 }
311 }
312 }
313
314 if (standby_running)
316}
317
318static void
319usage(void)
320{
321 printf(_("%s creates a new logical replica from a standby server.\n\n"),
322 progname);
323 printf(_("Usage:\n"));
324 printf(_(" %s [OPTION]...\n"), progname);
325 printf(_("\nOptions:\n"));
326 printf(_(" -a, --all create subscriptions for all databases except template\n"
327 " databases and databases that don't allow connections\n"));
328 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
329 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
330 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
331 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
332 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
333 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
334 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
335 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
336 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
337 printf(_(" -v, --verbose output verbose messages\n"));
338 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
339 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
340 printf(_(" --config-file=FILENAME use specified main server configuration\n"
341 " file when running target cluster\n"));
342 printf(_(" --publication=NAME publication name\n"));
343 printf(_(" --replication-slot=NAME replication slot name\n"));
344 printf(_(" --subscription=NAME subscription name\n"));
345 printf(_(" -V, --version output version information, then exit\n"));
346 printf(_(" -?, --help show this help, then exit\n"));
347 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
348 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
349}
350
351/*
352 * Subroutine to append "keyword=value" to a connection string,
353 * with proper quoting of the value. (We assume keywords don't need that.)
354 */
355static void
356appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
357{
358 if (buf->len > 0)
360 appendPQExpBufferStr(buf, keyword);
363}
364
365/*
366 * Validate a connection string. Returns a base connection string that is a
367 * connection string without a database name.
368 *
369 * Since we might process multiple databases, each database name will be
370 * appended to this base connection string to provide a final connection
371 * string. If the second argument (dbname) is not null, returns dbname if the
372 * provided connection string contains it.
373 *
374 * It is the caller's responsibility to free the returned connection string and
375 * dbname.
376 */
377static char *
378get_base_conninfo(const char *conninfo, char **dbname)
379{
383 char *errmsg = NULL;
384 char *ret;
385
386 conn_opts = PQconninfoParse(conninfo, &errmsg);
387 if (conn_opts == NULL)
388 {
390 "could not parse connection string: %s", errmsg);
392 return NULL;
393 }
394
396 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
397 {
398 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
399 {
400 if (strcmp(conn_opt->keyword, "dbname") == 0)
401 {
402 if (dbname)
403 *dbname = pg_strdup(conn_opt->val);
404 continue;
405 }
406 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
407 }
408 }
409
410 ret = pg_strdup(buf->data);
411
414
415 return ret;
416}
417
418/*
419 * Build a subscriber connection string. Only a few parameters are supported
420 * since it starts a server with restricted access.
421 */
422static char *
424{
426 char *ret;
427
428 appendConnStrItem(buf, "port", opt->sub_port);
429#if !defined(WIN32)
430 appendConnStrItem(buf, "host", opt->socket_dir);
431#endif
432 if (opt->sub_username != NULL)
433 appendConnStrItem(buf, "user", opt->sub_username);
434 appendConnStrItem(buf, "fallback_application_name", progname);
435
436 ret = pg_strdup(buf->data);
437
439
440 return ret;
441}
442
443/*
444 * Verify if a PostgreSQL binary (progname) is available in the same directory as
445 * pg_createsubscriber and it has the same version. It returns the absolute
446 * path of the progname.
447 */
448static char *
449get_exec_path(const char *argv0, const char *progname)
450{
451 char *versionstr;
452 char *exec_path;
453 int ret;
454
455 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
458
459 if (ret < 0)
460 {
461 char full_path[MAXPGPATH];
462
463 if (find_my_exec(argv0, full_path) < 0)
465
466 if (ret == -1)
467 report_createsub_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
468 progname, "pg_createsubscriber", full_path);
469 else
470 report_createsub_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
471 progname, full_path, "pg_createsubscriber");
472 }
473
475 "%s path is: %s", progname, exec_path);
476
477 return exec_path;
478}
479
480/*
481 * Is it a cluster directory? These are preliminary checks. It is far from
482 * making an accurate check. If it is not a clone from the publisher, it will
483 * eventually fail in a future step.
484 */
485static void
487{
488 struct stat statbuf;
489 uint32 major_version;
490 char *version_str;
491
493 "checking if directory \"%s\" is a cluster data directory",
494 datadir);
495
496 if (stat(datadir, &statbuf) != 0)
497 {
498 if (errno == ENOENT)
499 report_createsub_fatal("data directory \"%s\" does not exist", datadir);
500 else
501 report_createsub_fatal("could not access directory \"%s\": %m", datadir);
502 }
503
504 /*
505 * Retrieve the contents of this cluster's PG_VERSION. We require
506 * compatibility with the same major version as the one this tool is
507 * compiled with.
508 */
510 if (major_version != PG_MAJORVERSION_NUM)
511 {
513 "data directory is of wrong version");
515 "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
516 "PG_VERSION", version_str, PG_MAJORVERSION);
517 exit(1);
518 }
519}
520
521/*
522 * Append database name into a base connection string.
523 *
524 * dbname is the only parameter that changes so it is not included in the base
525 * connection string. This function concatenates dbname to build a "real"
526 * connection string.
527 */
528static char *
529concat_conninfo_dbname(const char *conninfo, const char *dbname)
530{
532 char *ret;
533
534 Assert(conninfo != NULL);
535
536 appendPQExpBufferStr(buf, conninfo);
537 appendConnStrItem(buf, "dbname", dbname);
538
539 ret = pg_strdup(buf->data);
541
542 return ret;
543}
544
545/*
546 * Store publication and subscription information.
547 *
548 * If publication, replication slot and subscription names were specified,
549 * store it here. Otherwise, a generated name will be assigned to the object in
550 * setup_publisher().
551 */
552static struct LogicalRepInfo *
554 const char *pub_base_conninfo,
555 const char *sub_base_conninfo)
556{
557 struct LogicalRepInfo *dbinfo;
561 int i = 0;
562
563 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
564
565 if (num_pubs > 0)
566 pubcell = opt->pub_names.head;
567 if (num_subs > 0)
568 subcell = opt->sub_names.head;
569 if (num_replslots > 0)
571
572 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
573 {
574 char *conninfo;
575
576 /* Fill publisher attributes */
577 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
578 dbinfo[i].pubconninfo = conninfo;
579 dbinfo[i].dbname = cell->val;
580 if (num_pubs > 0)
581 dbinfo[i].pubname = pubcell->val;
582 else
583 dbinfo[i].pubname = NULL;
584 if (num_replslots > 0)
585 dbinfo[i].replslotname = replslotcell->val;
586 else
587 dbinfo[i].replslotname = NULL;
588 dbinfo[i].made_replslot = false;
589 dbinfo[i].made_publication = false;
590 /* Fill subscriber attributes */
591 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
592 dbinfo[i].subconninfo = conninfo;
593 if (num_subs > 0)
594 dbinfo[i].subname = subcell->val;
595 else
596 dbinfo[i].subname = NULL;
597 /* Other fields will be filled later */
598
600 "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
601 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
602 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
603 dbinfo[i].pubconninfo);
605 "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
606 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
607 dbinfo[i].subconninfo,
608 dbinfos.two_phase ? "true" : "false");
609
610 if (num_pubs > 0)
611 pubcell = pubcell->next;
612 if (num_subs > 0)
613 subcell = subcell->next;
614 if (num_replslots > 0)
616
617 i++;
618 }
619
620 return dbinfo;
621}
622
623/*
624 * Open a new connection. If exit_on_error is true, it has an undesired
625 * condition and it should exit immediately.
626 */
627static PGconn *
628connect_database(const char *conninfo, bool exit_on_error)
629{
630 PGconn *conn;
631 PGresult *res;
632
633 conn = PQconnectdb(conninfo);
635 {
637 "connection to database failed: %s",
639 PQfinish(conn);
640
641 if (exit_on_error)
642 exit(1);
643 return NULL;
644 }
645
646 /* Secure search_path */
649 {
651 "could not clear \"search_path\": %s",
653 PQclear(res);
654 PQfinish(conn);
655
656 if (exit_on_error)
657 exit(1);
658 return NULL;
659 }
660 PQclear(res);
661
662 return conn;
663}
664
665/*
666 * Close the connection. If exit_on_error is true, it has an undesired
667 * condition and it should exit immediately.
668 */
669static void
670disconnect_database(PGconn *conn, bool exit_on_error)
671{
672 Assert(conn != NULL);
673
674 PQfinish(conn);
675
676 if (exit_on_error)
677 exit(1);
678}
679
680/*
681 * Obtain the system identifier using the provided connection. It will be used
682 * to compare if a data directory is a clone of another one.
683 */
684static uint64
685get_primary_sysid(const char *conninfo)
686{
687 PGconn *conn;
688 PGresult *res;
690
692 "getting system identifier from publisher");
693
694 conn = connect_database(conninfo, true);
695
696 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
698 {
700 "could not get system identifier: %s",
703 }
704 if (PQntuples(res) != 1)
705 {
707 "could not get system identifier: got %d rows, expected %d row",
708 PQntuples(res), 1);
710 }
711
712 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
713
715 "system identifier is %" PRIu64 " on publisher", sysid);
716
717 PQclear(res);
719
720 return sysid;
721}
722
723/*
724 * Obtain the system identifier from control file. It will be used to compare
725 * if a data directory is a clone of another one. This routine is used locally
726 * and avoids a connection.
727 */
728static uint64
730{
732 bool crc_ok;
734
736 "getting system identifier from subscriber");
737
739 if (!crc_ok)
740 report_createsub_fatal("control file appears to be corrupt");
741
742 sysid = cf->system_identifier;
743
745 "system identifier is %" PRIu64 " on subscriber", sysid);
746
747 pg_free(cf);
748
749 return sysid;
750}
751
752/*
753 * Modify the system identifier. Since a standby server preserves the system
754 * identifier, it makes sense to change it to avoid situations in which WAL
755 * files from one of the systems might be used in the other one.
756 */
757static void
759{
761 bool crc_ok;
762 struct timeval tv;
763
764 char *cmd_str;
765
767 "modifying system identifier of subscriber");
768
770 if (!crc_ok)
771 report_createsub_fatal("control file appears to be corrupt");
772
773 /*
774 * Select a new system identifier.
775 *
776 * XXX this code was extracted from BootStrapXLOG().
777 */
778 gettimeofday(&tv, NULL);
779 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
780 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
781 cf->system_identifier |= getpid() & 0xFFF;
782
783 if (dry_run)
785 "dry-run: would set system identifier to %" PRIu64 " on subscriber",
786 cf->system_identifier);
787 else
788 {
791 "system identifier is %" PRIu64 " on subscriber",
792 cf->system_identifier);
793 }
794
795 if (dry_run)
797 "dry-run: would run pg_resetwal on the subscriber");
798 else
800 "running pg_resetwal on the subscriber");
801
802 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
804
806 "pg_resetwal command is: %s", cmd_str);
807
808 if (!dry_run)
809 {
810 int rc = system(cmd_str);
811
812 if (rc == 0)
814 "successfully reset WAL on the subscriber");
815 else
816 report_createsub_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
817 }
818
819 pg_free(cf);
820}
821
822/*
823 * Generate an object name using a prefix, database oid and a random integer.
824 * It is used in case the user does not specify an object name (publication,
825 * subscription, replication slot).
826 */
827static char *
829{
830 PGresult *res;
831 Oid oid;
832 uint32 rand;
833 char *objname;
834
835 res = PQexec(conn,
836 "SELECT oid FROM pg_catalog.pg_database "
837 "WHERE datname = pg_catalog.current_database()");
839 {
841 "could not obtain database OID: %s",
844 }
845
846 if (PQntuples(res) != 1)
847 {
849 "could not obtain database OID: got %d rows, expected %d row",
850 PQntuples(res), 1);
852 }
853
854 /* Database OID */
855 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
856
857 PQclear(res);
858
859 /* Random unsigned integer */
861
862 /*
863 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
864 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
865 * '\0').
866 */
867 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
868
869 return objname;
870}
871
872/*
873 * Does the publication exist in the specified database?
874 */
875static bool
876find_publication(PGconn *conn, const char *pubname, const char *dbname)
877{
879 PGresult *res;
880 bool found = false;
881 char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
882
884 "SELECT 1 FROM pg_catalog.pg_publication "
885 "WHERE pubname = %s",
887 res = PQexec(conn, str->data);
889 {
891 "could not find publication \"%s\" in database \"%s\": %s",
892 pubname, dbname, PQerrorMessage(conn));
894 }
895
896 if (PQntuples(res) == 1)
897 found = true;
898
899 PQclear(res);
902
903 return found;
904}
905
906/*
907 * Create the publications and replication slots in preparation for logical
908 * replication. Returns the LSN from latest replication slot. It will be the
909 * replication start point that is used to adjust the subscriptions (see
910 * set_replication_progress).
911 */
912static char *
914{
915 char *lsn = NULL;
916
917 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
918
919 for (int i = 0; i < num_dbs; i++)
920 {
921 PGconn *conn;
922 char *genname = NULL;
923
924 conn = connect_database(dbinfo[i].pubconninfo, true);
925
926 /*
927 * If an object name was not specified as command-line options, assign
928 * a generated object name. The replication slot has a different rule.
929 * The subscription name is assigned to the replication slot name if
930 * no replication slot is specified. It follows the same rule as
931 * CREATE SUBSCRIPTION.
932 */
933 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
935 if (num_pubs == 0)
936 dbinfo[i].pubname = pg_strdup(genname);
937 if (num_subs == 0)
938 dbinfo[i].subname = pg_strdup(genname);
939 if (num_replslots == 0)
940 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
941
942 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
943 {
944 /* Reuse existing publication on publisher. */
946 "use existing publication \"%s\" in database \"%s\"",
947 dbinfo[i].pubname, dbinfo[i].dbname);
948 /* Don't remove pre-existing publication if an error occurs. */
949 dbinfo[i].made_publication = false;
950 }
951 else
952 {
953 /*
954 * Create publication on publisher. This step should be executed
955 * *before* promoting the subscriber to avoid any transactions
956 * between consistent LSN and the new publication rows (such
957 * transactions wouldn't see the new publication rows resulting in
958 * an error).
959 */
960 create_publication(conn, &dbinfo[i]);
961 }
962
963 /* Create replication slot on publisher */
964 if (lsn)
965 pg_free(lsn);
966 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
967 if (lsn == NULL && !dry_run)
968 exit(1);
969
970 /*
971 * Since we are using the LSN returned by the last replication slot as
972 * recovery_target_lsn, this LSN is ahead of the current WAL position
973 * and the recovery waits until the publisher writes a WAL record to
974 * reach the target and ends the recovery. On idle systems, this wait
975 * time is unpredictable and could lead to failure in promoting the
976 * subscriber. To avoid that, insert a harmless WAL record.
977 */
978 if (i == num_dbs - 1 && !dry_run)
979 {
980 PGresult *res;
981
982 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
984 {
986 "could not write an additional WAL record: %s",
989 }
990 PQclear(res);
991 }
992
994 }
995
996 return lsn;
997}
998
999/*
1000 * Is recovery still in progress?
1001 */
1002static bool
1004{
1005 PGresult *res;
1006 int ret;
1007
1008 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
1009
1010 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1011 {
1013 "could not obtain recovery progress: %s",
1016 }
1017
1018
1019 ret = strcmp("t", PQgetvalue(res, 0, 0));
1020
1021 PQclear(res);
1022
1023 return ret == 0;
1024}
1025
1026/*
1027 * Is the primary server ready for logical replication?
1028 *
1029 * XXX Does it not allow a synchronous replica?
1030 */
1031static void
1032check_publisher(const struct LogicalRepInfo *dbinfo)
1033{
1034 PGconn *conn;
1035 PGresult *res;
1036 bool failed = false;
1037
1038 char *wal_level;
1039 int max_repslots;
1040 int cur_repslots;
1041 int max_walsenders;
1042 int cur_walsenders;
1045
1047 "checking settings on publisher");
1048
1049 conn = connect_database(dbinfo[0].pubconninfo, true);
1050
1051 /*
1052 * If the primary server is in recovery (i.e. cascading replication),
1053 * objects (publication) cannot be created because it is read only.
1054 */
1056 {
1058 "primary server cannot be in recovery");
1060 }
1061
1062 /*------------------------------------------------------------------------
1063 * Logical replication requires a few parameters to be set on publisher.
1064 * Since these parameters are not a requirement for physical replication,
1065 * we should check it to make sure it won't fail.
1066 *
1067 * - wal_level >= replica
1068 * - max_replication_slots >= current + number of dbs to be converted
1069 * - max_wal_senders >= current + number of dbs to be converted
1070 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1071 * -----------------------------------------------------------------------
1072 */
1073 res = PQexec(conn,
1074 "SELECT pg_catalog.current_setting('wal_level'),"
1075 " pg_catalog.current_setting('max_replication_slots'),"
1076 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1077 " pg_catalog.current_setting('max_wal_senders'),"
1078 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1079 " pg_catalog.current_setting('max_prepared_transactions'),"
1080 " pg_catalog.current_setting('max_slot_wal_keep_size')");
1081
1082 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1083 {
1085 "could not obtain publisher settings: %s",
1088 }
1089
1090 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1091 max_repslots = atoi(PQgetvalue(res, 0, 1));
1092 cur_repslots = atoi(PQgetvalue(res, 0, 2));
1093 max_walsenders = atoi(PQgetvalue(res, 0, 3));
1094 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1097
1098 PQclear(res);
1099
1101 "publisher: wal_level: %s", wal_level);
1103 "publisher: max_replication_slots: %d", max_repslots);
1105 "publisher: current replication slots: %d", cur_repslots);
1107 "publisher: max_wal_senders: %d", max_walsenders);
1109 "publisher: current wal senders: %d", cur_walsenders);
1111 "publisher: max_prepared_transactions: %d",
1114 "publisher: max_slot_wal_keep_size: %s",
1116
1117 disconnect_database(conn, false);
1118
1119 if (strcmp(wal_level, "minimal") == 0)
1120 {
1122 "publisher requires \"wal_level\" >= \"replica\"");
1123 failed = true;
1124 }
1125
1127 {
1129 "publisher requires %d replication slots, but only %d remain",
1132 "Increase the configuration parameter \"%s\" to at least %d.",
1133 "max_replication_slots", cur_repslots + num_dbs);
1134 failed = true;
1135 }
1136
1138 {
1140 "publisher requires %d WAL sender processes, but only %d remain",
1143 "Increase the configuration parameter \"%s\" to at least %d.",
1144 "max_wal_senders", cur_walsenders + num_dbs);
1145 failed = true;
1146 }
1147
1149 {
1151 "two_phase option will not be enabled for replication slots");
1153 "Subscriptions will be created with the two_phase option disabled. "
1154 "Prepared transactions will be replicated at COMMIT PREPARED.");
1156 "You can use the command-line option --enable-two-phase to enable two_phase.");
1157 }
1158
1159 /*
1160 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1161 * is set to a non-default value, it may cause replication failures due to
1162 * required WAL files being prematurely removed.
1163 */
1164 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1165 {
1167 "required WAL could be removed from the publisher");
1169 "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1170 "max_slot_wal_keep_size");
1171 }
1172
1174
1175 if (failed)
1176 exit(1);
1177}
1178
1179/*
1180 * Is the standby server ready for logical replication?
1181 *
1182 * XXX Does it not allow a time-delayed replica?
1183 *
1184 * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1185 * is S, it cannot detect there is a replica (server C) because server S starts
1186 * accepting only local connections and server C cannot connect to it. Hence,
1187 * there is not a reliable way to provide a suitable error saying the server C
1188 * will be broken at the end of this process (due to pg_resetwal).
1189 */
1190static void
1192{
1193 PGconn *conn;
1194 PGresult *res;
1195 bool failed = false;
1196
1197 int max_lrworkers;
1198 int max_replorigins;
1199 int max_wprocs;
1200
1202 "checking settings on subscriber");
1203
1204 conn = connect_database(dbinfo[0].subconninfo, true);
1205
1206 /* The target server must be a standby */
1208 {
1210 "target server must be a standby");
1212 }
1213
1214 /*------------------------------------------------------------------------
1215 * Logical replication requires a few parameters to be set on subscriber.
1216 * Since these parameters are not a requirement for physical replication,
1217 * we should check it to make sure it won't fail.
1218 *
1219 * - max_active_replication_origins >= number of dbs to be converted
1220 * - max_logical_replication_workers >= number of dbs to be converted
1221 * - max_worker_processes >= 1 + number of dbs to be converted
1222 *------------------------------------------------------------------------
1223 */
1224 res = PQexec(conn,
1225 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1226 "'max_logical_replication_workers', "
1227 "'max_active_replication_origins', "
1228 "'max_worker_processes', "
1229 "'primary_slot_name') "
1230 "ORDER BY name");
1231
1232 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1233 {
1235 "could not obtain subscriber settings: %s",
1238 }
1239
1240 max_replorigins = atoi(PQgetvalue(res, 0, 0));
1241 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1242 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1243 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1245
1247 "subscriber: max_logical_replication_workers: %d",
1250 "subscriber: max_active_replication_origins: %d", max_replorigins);
1252 "subscriber: max_worker_processes: %d", max_wprocs);
1255 "subscriber: primary_slot_name: %s", primary_slot_name);
1256
1257 PQclear(res);
1258
1259 disconnect_database(conn, false);
1260
1262 {
1264 "subscriber requires %d active replication origins, but only %d remain",
1267 "Increase the configuration parameter \"%s\" to at least %d.",
1268 "max_active_replication_origins", num_dbs);
1269 failed = true;
1270 }
1271
1272 if (max_lrworkers < num_dbs)
1273 {
1275 "subscriber requires %d logical replication workers, but only %d remain",
1278 "Increase the configuration parameter \"%s\" to at least %d.",
1279 "max_logical_replication_workers", num_dbs);
1280 failed = true;
1281 }
1282
1283 if (max_wprocs < num_dbs + 1)
1284 {
1286 "subscriber requires %d worker processes, but only %d remain",
1287 num_dbs + 1, max_wprocs);
1289 "Increase the configuration parameter \"%s\" to at least %d.",
1290 "max_worker_processes", num_dbs + 1);
1291 failed = true;
1292 }
1293
1294 if (failed)
1295 exit(1);
1296}
1297
1298/*
1299 * Drop a specified subscription. This is to avoid duplicate subscriptions on
1300 * the primary (publisher node) and the newly created subscriber. We
1301 * shouldn't drop the associated slot as that would be used by the publisher
1302 * node.
1303 */
1304static void
1306{
1308 PGresult *res;
1309
1310 Assert(conn != NULL);
1311
1312 /*
1313 * Construct a query string. These commands are allowed to be executed
1314 * within a transaction.
1315 */
1316 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1317 subname);
1318 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1319 subname);
1320 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1321
1322 if (dry_run)
1324 "dry-run: would drop subscription \"%s\" in database \"%s\"",
1325 subname, dbname);
1326 else
1327 {
1329 "dropping subscription \"%s\" in database \"%s\"",
1330 subname, dbname);
1331
1332 res = PQexec(conn, query->data);
1333
1334 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1335 {
1337 "could not drop subscription \"%s\": %s",
1340 }
1341
1342 PQclear(res);
1343 }
1344
1345 destroyPQExpBuffer(query);
1346}
1347
1348/*
1349 * Retrieve and drop the pre-existing subscriptions.
1350 */
1351static void
1353 const struct LogicalRepInfo *dbinfo)
1354{
1356 char *dbname;
1357 PGresult *res;
1358
1359 Assert(conn != NULL);
1360
1361 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1362
1363 appendPQExpBuffer(query,
1364 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1365 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1366 "WHERE d.datname = %s",
1367 dbname);
1368 res = PQexec(conn, query->data);
1369
1370 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1371 {
1373 "could not obtain pre-existing subscriptions: %s",
1376 }
1377
1378 for (int i = 0; i < PQntuples(res); i++)
1380 dbinfo->dbname);
1381
1382 PQclear(res);
1383 destroyPQExpBuffer(query);
1385}
1386
1387/*
1388 * Create the subscriptions, adjust the initial location for logical
1389 * replication and enable the subscriptions. That's the last step for logical
1390 * replication setup.
1391 */
1392static void
1394{
1395 for (int i = 0; i < num_dbs; i++)
1396 {
1397 PGconn *conn;
1398
1399 /* Connect to subscriber. */
1400 conn = connect_database(dbinfo[i].subconninfo, true);
1401
1402 /*
1403 * We don't need the pre-existing subscriptions on the newly formed
1404 * subscriber. They can connect to other publisher nodes and either
1405 * get some unwarranted data or can lead to ERRORs in connecting to
1406 * such nodes.
1407 */
1409
1410 /* Check and drop the required publications in the given database. */
1412
1413 create_subscription(conn, &dbinfo[i]);
1414
1415 /* Set the replication progress to the correct LSN */
1417
1418 /* Enable subscription */
1419 enable_subscription(conn, &dbinfo[i]);
1420
1421 disconnect_database(conn, false);
1422 }
1423}
1424
1425/*
1426 * Write the required recovery parameters.
1427 */
1428static void
1429setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1430{
1431 PGconn *conn;
1433
1434 /*
1435 * Despite of the recovery parameters will be written to the subscriber,
1436 * use a publisher connection. The primary_conninfo is generated using the
1437 * connection settings.
1438 */
1439 conn = connect_database(dbinfo[0].pubconninfo, true);
1440
1441 /*
1442 * Write recovery parameters.
1443 *
1444 * The subscriber is not running yet. In dry run mode, the recovery
1445 * parameters *won't* be written. An invalid LSN is used for printing
1446 * purposes. Additional recovery parameters are added here. It avoids
1447 * unexpected behavior such as end of recovery as soon as a consistent
1448 * state is reached (recovery_target) and failure due to multiple recovery
1449 * targets (name, time, xid, LSN).
1450 */
1452 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1454 "recovery_target_timeline = 'latest'\n");
1455
1456 /*
1457 * Set recovery_target_inclusive = false to avoid reapplying the
1458 * transaction committed at 'lsn' after subscription is enabled. This is
1459 * because the provided 'lsn' is also used as the replication start point
1460 * for the subscription. So, the server can send the transaction committed
1461 * at that 'lsn' after replication is started which can lead to applying
1462 * the same transaction twice if we keep recovery_target_inclusive = true.
1463 */
1465 "recovery_target_inclusive = false\n");
1467 "recovery_target_action = promote\n");
1468 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1469 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1470 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1471
1472 if (dry_run)
1473 {
1474 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1476 "recovery_target_lsn = '%X/%08X'\n",
1478 }
1479 else
1480 {
1481 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1482 lsn);
1483 }
1484
1486 "recovery parameters:\n%s", recoveryconfcontents->data);
1487
1488 if (!dry_run)
1489 {
1491 FILE *fd;
1492
1493 /* Write the recovery parameters to INCLUDED_CONF_FILE */
1496 fd = fopen(conf_filename, "w");
1497 if (fd == NULL)
1498 report_createsub_fatal("could not open file \"%s\": %m", conf_filename);
1499
1501 report_createsub_fatal("could not write to file \"%s\": %m", conf_filename);
1502
1503 fclose(fd);
1504 recovery_params_set = true;
1505
1506 /* Include conditionally the recovery parameters. */
1509 "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1511 }
1512
1513 disconnect_database(conn, false);
1514}
1515
1516/*
1517 * Drop physical replication slot on primary if the standby was using it. After
1518 * the transformation, it has no use.
1519 *
1520 * XXX we might not fail here. Instead, we provide a warning so the user
1521 * eventually drops this replication slot later.
1522 */
1523static void
1524drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1525{
1526 PGconn *conn;
1527
1528 /* Replication slot does not exist, do nothing */
1529 if (!primary_slot_name)
1530 return;
1531
1532 conn = connect_database(dbinfo[0].pubconninfo, false);
1533 if (conn != NULL)
1534 {
1535 drop_replication_slot(conn, &dbinfo[0], slotname);
1536 disconnect_database(conn, false);
1537 }
1538 else
1539 {
1541 "could not drop replication slot \"%s\" on primary",
1542 slotname);
1544 "Drop this replication slot soon to avoid retention of WAL files.");
1545 }
1546}
1547
1548/*
1549 * Drop failover replication slots on subscriber. After the transformation,
1550 * they have no use.
1551 *
1552 * XXX We do not fail here. Instead, we provide a warning so the user can drop
1553 * them later.
1554 */
1555static void
1557{
1558 PGconn *conn;
1559 PGresult *res;
1560
1561 conn = connect_database(dbinfo[0].subconninfo, false);
1562 if (conn != NULL)
1563 {
1564 /* Get failover replication slot names */
1565 res = PQexec(conn,
1566 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1567
1568 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1569 {
1570 /* Remove failover replication slots from subscriber */
1571 for (int i = 0; i < PQntuples(res); i++)
1572 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1573 }
1574 else
1575 {
1577 "could not obtain failover replication slot information: %s",
1580 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1581 }
1582
1583 PQclear(res);
1584 disconnect_database(conn, false);
1585 }
1586 else
1587 {
1589 "could not drop failover replication slot");
1591 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1592 }
1593}
1594
1595/*
1596 * Create a logical replication slot and returns a LSN.
1597 *
1598 * CreateReplicationSlot() is not used because it does not provide the one-row
1599 * result set that contains the LSN.
1600 */
1601static char *
1603{
1605 PGresult *res = NULL;
1606 const char *slot_name = dbinfo->replslotname;
1607 char *slot_name_esc;
1608 char *lsn = NULL;
1609
1610 Assert(conn != NULL);
1611
1612 if (dry_run)
1614 "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1615 slot_name, dbinfo->dbname);
1616 else
1618 "creating the replication slot \"%s\" in database \"%s\" on publisher",
1619 slot_name, dbinfo->dbname);
1620
1621 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1622
1624 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1626 dbinfos.two_phase ? "true" : "false");
1627
1629
1631 "command is: %s", str->data);
1632
1633 if (!dry_run)
1634 {
1635 res = PQexec(conn, str->data);
1636 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1637 {
1639 "could not create replication slot \"%s\" in database \"%s\": %s",
1640 slot_name, dbinfo->dbname,
1642 PQclear(res);
1644 return NULL;
1645 }
1646
1647 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1648 PQclear(res);
1649 }
1650
1651 /* For cleanup purposes */
1652 dbinfo->made_replslot = true;
1653
1655
1656 return lsn;
1657}
1658
1659static void
1661 const char *slot_name)
1662{
1664 char *slot_name_esc;
1665 PGresult *res;
1666
1667 Assert(conn != NULL);
1668
1669 if (dry_run)
1671 "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1672 slot_name, dbinfo->dbname);
1673 else
1675 "dropping the replication slot \"%s\" in database \"%s\"",
1676 slot_name, dbinfo->dbname);
1677
1678 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1679
1680 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1681
1683
1685 "command is: %s", str->data);
1686
1687 if (!dry_run)
1688 {
1689 res = PQexec(conn, str->data);
1690 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1691 {
1693 "could not drop replication slot \"%s\" in database \"%s\": %s",
1694 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1695 dbinfo->made_replslot = false; /* don't try again. */
1696 }
1697
1698 PQclear(res);
1699 }
1700
1702}
1703
1704/*
1705 * Reports a suitable message if pg_ctl fails.
1706 */
1707static void
1708pg_ctl_status(const char *pg_ctl_cmd, int rc)
1709{
1710 if (rc != 0)
1711 {
1712 if (WIFEXITED(rc))
1713 {
1715 "pg_ctl failed with exit code %d",
1716 WEXITSTATUS(rc));
1717 }
1718 else if (WIFSIGNALED(rc))
1719 {
1720#if defined(WIN32)
1722 "pg_ctl was terminated by exception 0x%X",
1723 WTERMSIG(rc));
1725 "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1726#else
1728 "pg_ctl was terminated by signal %d: %s",
1729 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1730#endif
1731 }
1732 else
1733 {
1735 "pg_ctl exited with unrecognized status %d", rc);
1736 }
1737
1739 "The failed command was: %s", pg_ctl_cmd);
1740 exit(1);
1741 }
1742}
1743
1744static void
1747{
1749 int rc;
1750
1751 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1753 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1754
1755 /* Prevent unintended slot invalidation */
1756 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1757
1759 {
1760 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1761#if !defined(WIN32)
1762
1763 /*
1764 * An empty listen_addresses list means the server does not listen on
1765 * any IP interfaces; only Unix-domain sockets can be used to connect
1766 * to the server. Prevent external connections to minimize the chance
1767 * of failure.
1768 */
1769 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1770 if (opt->socket_dir)
1771 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1772 opt->socket_dir);
1774#endif
1775 }
1776 if (opt->config_file != NULL)
1777 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1778 opt->config_file);
1779
1780 /* Suppress to start logical replication if requested */
1782 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1783
1785 "pg_ctl command is: %s", pg_ctl_cmd->data);
1786 rc = system(pg_ctl_cmd->data);
1787 pg_ctl_status(pg_ctl_cmd->data, rc);
1788 standby_running = true;
1791 "server was started");
1792}
1793
1794static void
1796{
1797 char *pg_ctl_cmd;
1798 int rc;
1799
1800 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1801 datadir);
1803 "pg_ctl command is: %s", pg_ctl_cmd);
1804 rc = system(pg_ctl_cmd);
1806 standby_running = false;
1808 "server was stopped");
1809}
1810
1811/*
1812 * Returns after the server finishes the recovery process.
1813 *
1814 * If recovery_timeout option is set, terminate abnormally without finishing
1815 * the recovery process. By default, it waits forever.
1816 *
1817 * XXX Is the recovery process still in progress? When recovery process has a
1818 * better progress reporting mechanism, it should be added here.
1819 */
1820static void
1821wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1822{
1823 PGconn *conn;
1824 bool ready = false;
1825 int timer = 0;
1826
1828 "waiting for the target server to reach the consistent state");
1829
1830 conn = connect_database(conninfo, true);
1831
1832 for (;;)
1833 {
1834 /* Did the recovery process finish? We're done if so. */
1836 {
1837 ready = true;
1838 recovery_ended = true;
1839 break;
1840 }
1841
1842 /* Bail out after recovery_timeout seconds if this option is set */
1843 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1844 {
1847 "recovery timed out");
1849 }
1850
1851 /* Keep waiting */
1854 }
1855
1856 disconnect_database(conn, false);
1857
1858 if (!ready)
1859 report_createsub_fatal("server did not end recovery");
1860
1862 "target server reached the consistent state");
1864 "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1865}
1866
1867/*
1868 * Create a publication that includes all tables in the database.
1869 */
1870static void
1872{
1874 PGresult *res;
1875 char *ipubname_esc;
1876 char *spubname_esc;
1877
1878 Assert(conn != NULL);
1879
1881 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1882
1883 /* Check if the publication already exists */
1885 "SELECT 1 FROM pg_catalog.pg_publication "
1886 "WHERE pubname = %s",
1887 spubname_esc);
1888 res = PQexec(conn, str->data);
1889 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1890 {
1892 "could not obtain publication information: %s",
1895 }
1896
1897 if (PQntuples(res) == 1)
1898 {
1899 /*
1900 * Unfortunately, if it reaches this code path, it will always fail
1901 * (unless you decide to change the existing publication name). That's
1902 * bad but it is very unlikely that the user will choose a name with
1903 * pg_createsubscriber_ prefix followed by the exact database oid and
1904 * a random number.
1905 */
1907 "publication \"%s\" already exists", dbinfo->pubname);
1909 "Consider renaming this publication before continuing.");
1911 }
1912
1913 PQclear(res);
1915
1916 if (dry_run)
1918 "dry-run: would create publication \"%s\" in database \"%s\"",
1919 dbinfo->pubname, dbinfo->dbname);
1920 else
1922 "creating publication \"%s\" in database \"%s\"",
1923 dbinfo->pubname, dbinfo->dbname);
1924
1925 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1926 ipubname_esc);
1927
1929 "command is: %s", str->data);
1930
1931 if (!dry_run)
1932 {
1933 res = PQexec(conn, str->data);
1934 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1935 {
1937 "could not create publication \"%s\" in database \"%s\": %s",
1938 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1940 }
1941 PQclear(res);
1942 }
1943
1944 /* For cleanup purposes */
1945 dbinfo->made_publication = true;
1946
1950}
1951
1952/*
1953 * Drop the specified publication in the given database.
1954 */
1955static void
1956drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1957 bool *made_publication)
1958{
1960 PGresult *res;
1961 char *pubname_esc;
1962
1963 Assert(conn != NULL);
1964
1965 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1966
1967 if (dry_run)
1969 "dry-run: would drop publication \"%s\" in database \"%s\"",
1970 pubname, dbname);
1971 else
1973 "dropping publication \"%s\" in database \"%s\"",
1974 pubname, dbname);
1975
1976 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1977
1979
1981 "command is: %s", str->data);
1982
1983 if (!dry_run)
1984 {
1985 res = PQexec(conn, str->data);
1986 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1987 {
1989 "could not drop publication \"%s\" in database \"%s\": %s",
1990 pubname, dbname, PQresultErrorMessage(res));
1991 *made_publication = false; /* don't try again. */
1992
1993 /*
1994 * Don't disconnect and exit here. This routine is used by primary
1995 * (cleanup publication / replication slot due to an error) and
1996 * subscriber (remove the replicated publications). In both cases,
1997 * it can continue and provide instructions for the user to remove
1998 * it later if cleanup fails.
1999 */
2000 }
2001 PQclear(res);
2002 }
2003
2005}
2006
2007/*
2008 * Retrieve and drop the publications.
2009 *
2010 * Publications copied during physical replication remain on the subscriber
2011 * after promotion. If --clean=publications is specified, drop all existing
2012 * publications in the subscriber database. Otherwise, only drop publications
2013 * that were created by pg_createsubscriber during this operation.
2014 */
2015static void
2017{
2018 PGresult *res;
2020
2021 Assert(conn != NULL);
2022
2023 if (drop_all_pubs)
2024 {
2026 "dropping all existing publications in database \"%s\"",
2027 dbinfo->dbname);
2028
2029 /* Fetch all publication names */
2030 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
2031 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2032 {
2034 "could not obtain publication information: %s",
2036 PQclear(res);
2038 }
2039
2040 /* Drop each publication */
2041 for (int i = 0; i < PQntuples(res); i++)
2042 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
2043 &dbinfo->made_publication);
2044
2045 PQclear(res);
2046 }
2047 else
2048 {
2049 /* Drop publication only if it was created by this tool */
2050 if (dbinfo->made_publication)
2051 {
2052 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
2053 &dbinfo->made_publication);
2054 }
2055 else
2056 {
2057 if (dry_run)
2059 "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
2060 dbinfo->pubname, dbinfo->dbname);
2061 else
2063 "preserve existing publication \"%s\" in database \"%s\"",
2064 dbinfo->pubname, dbinfo->dbname);
2065 }
2066 }
2067}
2068
2069/*
2070 * Create a subscription with some predefined options.
2071 *
2072 * A replication slot was already created in a previous step. Let's use it. It
2073 * is not required to copy data. The subscription will be created but it will
2074 * not be enabled now. That's because the replication progress must be set and
2075 * the replication origin name (one of the function arguments) contains the
2076 * subscription OID in its name. Once the subscription is created,
2077 * set_replication_progress() can obtain the chosen origin name and set up its
2078 * initial location.
2079 */
2080static void
2082{
2084 PGresult *res;
2085 char *pubname_esc;
2086 char *subname_esc;
2087 char *pubconninfo_esc;
2088 char *replslotname_esc;
2089
2090 Assert(conn != NULL);
2091
2096
2097 if (dry_run)
2099 "dry-run: would create subscription \"%s\" in database \"%s\"",
2100 dbinfo->subname, dbinfo->dbname);
2101 else
2103 "creating subscription \"%s\" in database \"%s\"",
2104 dbinfo->subname, dbinfo->dbname);
2105
2107 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2108 "WITH (create_slot = false, enabled = false, "
2109 "slot_name = %s, copy_data = false, two_phase = %s)",
2111 dbinfos.two_phase ? "true" : "false");
2112
2117
2119 "command is: %s", str->data);
2120
2121 if (!dry_run)
2122 {
2123 res = PQexec(conn, str->data);
2124 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2125 {
2127 "could not create subscription \"%s\" in database \"%s\": %s",
2128 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2130 }
2131 PQclear(res);
2132 }
2133
2135}
2136
2137/*
2138 * Sets the replication progress to the consistent LSN.
2139 *
2140 * The subscriber caught up to the consistent LSN provided by the last
2141 * replication slot that was created. The goal is to set up the initial
2142 * location for the logical replication that is the exact LSN that the
2143 * subscriber was promoted. Once the subscription is enabled it will start
2144 * streaming from that location onwards. In dry run mode, the subscription OID
2145 * and LSN are set to invalid values for printing purposes.
2146 */
2147static void
2148set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
2149{
2151 PGresult *res;
2152 Oid suboid;
2153 char *subname;
2154 char *dbname;
2155 char *originname;
2156 char *lsnstr;
2157
2158 Assert(conn != NULL);
2159
2160 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2161 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2162
2164 "SELECT s.oid FROM pg_catalog.pg_subscription s "
2165 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2166 "WHERE s.subname = %s AND d.datname = %s",
2167 subname, dbname);
2168
2169 res = PQexec(conn, str->data);
2170 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2171 {
2173 "could not obtain subscription OID: %s",
2176 }
2177
2178 if (PQntuples(res) != 1 && !dry_run)
2179 {
2181 "could not obtain subscription OID: got %d rows, expected %d row",
2182 PQntuples(res), 1);
2184 }
2185
2186 if (dry_run)
2187 {
2190 }
2191 else
2192 {
2193 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2194 lsnstr = psprintf("%s", lsn);
2195 }
2196
2197 PQclear(res);
2198
2199 /*
2200 * The origin name is defined as pg_%u. %u is the subscription OID. See
2201 * ApplyWorkerMain().
2202 */
2203 originname = psprintf("pg_%u", suboid);
2204
2205 if (dry_run)
2207 "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2208 originname, lsnstr, dbinfo->dbname);
2209 else
2211 "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2212 originname, lsnstr, dbinfo->dbname);
2213
2216 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2218
2220 "command is: %s", str->data);
2221
2222 if (!dry_run)
2223 {
2224 res = PQexec(conn, str->data);
2225 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2226 {
2228 "could not set replication progress for subscription \"%s\": %s",
2229 dbinfo->subname, PQresultErrorMessage(res));
2231 }
2232 PQclear(res);
2233 }
2234
2238 pg_free(lsnstr);
2240}
2241
2242/*
2243 * Enables the subscription.
2244 *
2245 * The subscription was created in a previous step but it was disabled. After
2246 * adjusting the initial logical replication location, enable the subscription.
2247 */
2248static void
2250{
2252 PGresult *res;
2253 char *subname;
2254
2255 Assert(conn != NULL);
2256
2257 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2258
2259 if (dry_run)
2261 "dry-run: would enable subscription \"%s\" in database \"%s\"",
2262 dbinfo->subname, dbinfo->dbname);
2263 else
2265 "enabling subscription \"%s\" in database \"%s\"",
2266 dbinfo->subname, dbinfo->dbname);
2267
2268 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2269
2271 "command is: %s", str->data);
2272
2273 if (!dry_run)
2274 {
2275 res = PQexec(conn, str->data);
2276 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2277 {
2279 "could not enable subscription \"%s\": %s",
2280 dbinfo->subname, PQresultErrorMessage(res));
2282 }
2283
2284 PQclear(res);
2285 }
2286
2289}
2290
2291/*
2292 * Fetch a list of all connectable non-template databases from the source server
2293 * and form a list such that they appear as if the user has specified multiple
2294 * --database options, one for each source database.
2295 */
2296static void
2298 bool dbnamespecified)
2299{
2300 PGconn *conn;
2301 PGresult *res;
2302
2303 /* If a database name was specified, just connect to it. */
2304 if (dbnamespecified)
2306 else
2307 {
2308 /* Otherwise, try postgres first and then template1. */
2309 char *conninfo;
2310
2311 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2312 conn = connect_database(conninfo, false);
2313 pg_free(conninfo);
2314 if (!conn)
2315 {
2316 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2317 conn = connect_database(conninfo, true);
2318 pg_free(conninfo);
2319 }
2320 }
2321
2322 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2323 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2324 {
2326 "could not obtain a list of databases: %s",
2328 PQclear(res);
2330 }
2331
2332 for (int i = 0; i < PQntuples(res); i++)
2333 {
2334 const char *dbname = PQgetvalue(res, i, 0);
2335
2337
2338 /* Increment num_dbs to reflect multiple --database options */
2339 num_dbs++;
2340 }
2341
2342 PQclear(res);
2343 disconnect_database(conn, false);
2344}
2345
2346int
2347main(int argc, char **argv)
2348{
2349 static struct option long_options[] =
2350 {
2351 {"all", no_argument, NULL, 'a'},
2352 {"database", required_argument, NULL, 'd'},
2353 {"pgdata", required_argument, NULL, 'D'},
2354 {"dry-run", no_argument, NULL, 'n'},
2355 {"subscriber-port", required_argument, NULL, 'p'},
2356 {"publisher-server", required_argument, NULL, 'P'},
2357 {"socketdir", required_argument, NULL, 's'},
2358 {"recovery-timeout", required_argument, NULL, 't'},
2359 {"enable-two-phase", no_argument, NULL, 'T'},
2360 {"subscriber-username", required_argument, NULL, 'U'},
2361 {"verbose", no_argument, NULL, 'v'},
2362 {"version", no_argument, NULL, 'V'},
2363 {"help", no_argument, NULL, '?'},
2364 {"config-file", required_argument, NULL, 1},
2365 {"publication", required_argument, NULL, 2},
2366 {"replication-slot", required_argument, NULL, 3},
2367 {"subscription", required_argument, NULL, 4},
2368 {"clean", required_argument, NULL, 5},
2369 {NULL, 0, NULL, 0}
2370 };
2371
2372 struct CreateSubscriberOptions opt = {0};
2373
2374 int c;
2375 int option_index;
2376
2377 char *pub_base_conninfo;
2378 char *sub_base_conninfo;
2379 char *dbname_conninfo = NULL;
2380
2383 struct stat statbuf;
2384
2385 char *consistent_lsn;
2386
2387 char pidfile[MAXPGPATH];
2388
2389 pg_logging_init(argv[0]);
2391 progname = get_progname(argv[0]);
2392 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2393
2394 if (argc > 1)
2395 {
2396 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2397 {
2398 usage();
2399 exit(0);
2400 }
2401 else if (strcmp(argv[1], "-V") == 0
2402 || strcmp(argv[1], "--version") == 0)
2403 {
2404 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2405 exit(0);
2406 }
2407 }
2408
2409 /* Default settings */
2411 opt.config_file = NULL;
2412 opt.pub_conninfo_str = NULL;
2413 opt.socket_dir = NULL;
2415 opt.sub_username = NULL;
2416 opt.two_phase = false;
2418 {
2419 0
2420 };
2421 opt.recovery_timeout = 0;
2422 opt.all_dbs = false;
2423
2424 /*
2425 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2426 * it either.
2427 */
2428#ifndef WIN32
2429 if (geteuid() == 0)
2430 {
2432 "cannot be executed by \"root\"");
2434 "You must run %s as the PostgreSQL superuser.",
2435 progname);
2436 exit(1);
2437 }
2438#endif
2439
2441
2442 while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2443 long_options, &option_index)) != -1)
2444 {
2445 switch (c)
2446 {
2447 case 'a':
2448 opt.all_dbs = true;
2449 break;
2450 case 'd':
2452 {
2454 num_dbs++;
2455 }
2456 else
2457 report_createsub_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2458 break;
2459 case 'D':
2462 break;
2463 case 'n':
2464 dry_run = true;
2465 break;
2466 case 'p':
2467 opt.sub_port = pg_strdup(optarg);
2468 break;
2469 case 'P':
2471 break;
2472 case 's':
2475 break;
2476 case 't':
2478 break;
2479 case 'T':
2480 opt.two_phase = true;
2481 break;
2482 case 'U':
2484 break;
2485 case 'v':
2487 break;
2488 case 1:
2490 break;
2491 case 2:
2493 {
2495 num_pubs++;
2496 }
2497 else
2498 report_createsub_fatal("publication \"%s\" specified more than once for --publication", optarg);
2499 break;
2500 case 3:
2502 {
2504 num_replslots++;
2505 }
2506 else
2507 report_createsub_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2508 break;
2509 case 4:
2511 {
2513 num_subs++;
2514 }
2515 else
2516 report_createsub_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2517 break;
2518 case 5:
2521 else
2522 report_createsub_fatal("object type \"%s\" specified more than once for --clean", optarg);
2523 break;
2524 default:
2525 /* getopt_long already emitted a complaint */
2527 "Try \"%s --help\" for more information.",
2528 progname);
2529 exit(1);
2530 }
2531 }
2532
2533 /* Validate that --all is not used with incompatible options */
2534 if (opt.all_dbs)
2535 {
2536 char *bad_switch = NULL;
2537
2538 if (num_dbs > 0)
2539 bad_switch = "--database";
2540 else if (num_pubs > 0)
2541 bad_switch = "--publication";
2542 else if (num_replslots > 0)
2543 bad_switch = "--replication-slot";
2544 else if (num_subs > 0)
2545 bad_switch = "--subscription";
2546
2547 if (bad_switch)
2548 {
2550 "options %s and %s cannot be used together",
2551 bad_switch, "-a/--all");
2553 "Try \"%s --help\" for more information.",
2554 progname);
2555 exit(1);
2556 }
2557 }
2558
2559 /* Any non-option arguments? */
2560 if (optind < argc)
2561 {
2563 "too many command-line arguments (first is \"%s\")",
2564 argv[optind]);
2566 "Try \"%s --help\" for more information.", progname);
2567 exit(1);
2568 }
2569
2570 /* Required arguments */
2571 if (subscriber_dir == NULL)
2572 {
2574 "no subscriber data directory specified");
2576 "Try \"%s --help\" for more information.", progname);
2577 exit(1);
2578 }
2579
2580 /* If socket directory is not provided, use the current directory */
2581 if (opt.socket_dir == NULL)
2582 {
2583 char cwd[MAXPGPATH];
2584
2585 if (!getcwd(cwd, MAXPGPATH))
2586 report_createsub_fatal("could not determine current directory");
2587 opt.socket_dir = pg_strdup(cwd);
2589 }
2590
2591 /*
2592 * Parse connection string. Build a base connection string that might be
2593 * reused by multiple databases.
2594 */
2595 if (opt.pub_conninfo_str == NULL)
2596 {
2597 /*
2598 * TODO use primary_conninfo (if available) from subscriber and
2599 * extract publisher connection string. Assume that there are
2600 * identical entries for physical and logical replication. If there is
2601 * not, we would fail anyway.
2602 */
2604 "no publisher connection string specified");
2606 "Try \"%s --help\" for more information.", progname);
2607 exit(1);
2608 }
2609
2610 if (dry_run)
2612 "Executing in dry-run mode.\n"
2613 "The target directory will not be modified.");
2614
2616 "validating publisher connection string");
2619 if (pub_base_conninfo == NULL)
2620 exit(1);
2621
2623 "validating subscriber connection string");
2625
2626 /*
2627 * Fetch all databases from the source (publisher) and treat them as if
2628 * the user specified has multiple --database options, one for each source
2629 * database.
2630 */
2631 if (opt.all_dbs)
2632 {
2634
2636 }
2637
2638 if (opt.database_names.head == NULL)
2639 {
2641 "no database was specified");
2642
2643 /*
2644 * Try to obtain the dbname from the publisher conninfo. If dbname
2645 * parameter is not available, error out.
2646 */
2647 if (dbname_conninfo)
2648 {
2650 num_dbs++;
2651
2653 "database name \"%s\" was extracted from the publisher connection string",
2655 }
2656 else
2657 {
2659 "no database name specified");
2661 "Try \"%s --help\" for more information.",
2662 progname);
2663 exit(1);
2664 }
2665 }
2666
2667 /* Number of object names must match number of databases */
2668 if (num_pubs > 0 && num_pubs != num_dbs)
2669 {
2671 "wrong number of publication names specified");
2673 "The number of specified publication names (%d) must match the number of specified database names (%d).",
2674 num_pubs, num_dbs);
2675 exit(1);
2676 }
2677 if (num_subs > 0 && num_subs != num_dbs)
2678 {
2680 "wrong number of subscription names specified");
2682 "The number of specified subscription names (%d) must match the number of specified database names (%d).",
2683 num_subs, num_dbs);
2684 exit(1);
2685 }
2686 if (num_replslots > 0 && num_replslots != num_dbs)
2687 {
2689 "wrong number of replication slot names specified");
2691 "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2693 exit(1);
2694 }
2695
2696 /* Verify the object types specified for removal from the subscriber */
2697 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2698 {
2699 if (pg_strcasecmp(cell->val, "publications") == 0)
2701 else
2702 {
2704 "invalid object type \"%s\" specified for %s",
2705 cell->val, "--clean");
2707 "The valid value is: \"%s\"", "publications");
2708 exit(1);
2709 }
2710 }
2711
2712 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2713 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2714 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2715
2716 /* Rudimentary check for a data directory */
2718
2720
2721 /*
2722 * Store database information for publisher and subscriber. It should be
2723 * called before atexit() because its return is used in the
2724 * cleanup_objects_atexit().
2725 */
2727
2728 /* Register a function to clean up objects in case of failure */
2730
2731 /*
2732 * Check if the subscriber data directory has the same system identifier
2733 * than the publisher data directory.
2734 */
2737 if (pub_sysid != sub_sysid)
2738 report_createsub_fatal("subscriber data directory is not a copy of the source database cluster");
2739
2740 /* Subscriber PID file */
2741 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2742
2743 /*
2744 * The standby server must not be running. If the server is started under
2745 * service manager and pg_createsubscriber stops it, the service manager
2746 * might react to this action and start the server again. Therefore,
2747 * refuse to proceed if the server is running to avoid possible failures.
2748 */
2749 if (stat(pidfile, &statbuf) == 0)
2750 {
2752 "standby server is running");
2754 "Stop the standby server and try again.");
2755 exit(1);
2756 }
2757
2758 /*
2759 * Start a short-lived standby server with temporary parameters (provided
2760 * by command-line options). The goal is to avoid connections during the
2761 * transformation steps.
2762 */
2764 "starting the standby server with command-line options");
2765 start_standby_server(&opt, true, false);
2766
2767 /* Check if the standby server is ready for logical replication */
2769
2770 /* Check if the primary server is ready for logical replication */
2772
2773 /*
2774 * Stop the target server. The recovery process requires that the server
2775 * reaches a consistent state before targeting the recovery stop point.
2776 * Make sure a consistent state is reached (stop the target server
2777 * guarantees it) *before* creating the replication slots in
2778 * setup_publisher().
2779 */
2781 "stopping the subscriber");
2783
2784 /* Create the required objects for each database on publisher */
2786
2787 /* Write the required recovery parameters */
2789
2790 /*
2791 * Start subscriber so the recovery parameters will take effect. Wait
2792 * until accepting connections. We don't want to start logical replication
2793 * during setup.
2794 */
2796 "starting the subscriber");
2797 start_standby_server(&opt, true, true);
2798
2799 /* Waiting the subscriber to be promoted */
2801
2802 /*
2803 * Create the subscription for each database on subscriber. It does not
2804 * enable it immediately because it needs to adjust the replication start
2805 * point to the LSN reported by setup_publisher(). It also cleans up
2806 * publications created by this tool and replication to the standby.
2807 */
2809
2810 /* Remove primary_slot_name if it exists on primary */
2812
2813 /* Remove failover replication slots if they exist on subscriber */
2815
2816 /* Stop the subscriber */
2818 "stopping the subscriber");
2820
2821 /* Change system identifier from subscriber */
2823
2824 success = true;
2825
2827 "Done!");
2828
2829 return 0;
2830}
#define pg_noreturn
Definition c.h:184
#define Assert(condition)
Definition c.h:945
#define PG_TEXTDOMAIN(domain)
Definition c.h:1305
#define pg_attribute_printf(f, a)
Definition c.h:262
uint32 bits32
Definition c.h:627
uint64_t uint64
Definition c.h:619
uint32_t uint32
Definition c.h:618
int find_my_exec(const char *argv0, char *retpath)
Definition exec.c:161
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition exec.c:430
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition exec.c:311
int main(void)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition connect.h:25
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
#define USECS_PER_SEC
Definition timestamp.h:134
#define _(x)
Definition elog.c:95
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
PGconn * PQconnectdb(const char *conninfo)
Definition fe-connect.c:826
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
void PQfreemem(void *ptr)
Definition fe-exec.c:4049
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4399
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4405
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
char * pg_strdup(const char *in)
Definition fe_memutils.c:85
void pg_free(void *ptr)
#define pg_malloc_array(type, count)
Definition fe_memutils.h:56
uint32 get_pg_version(const char *datadir, char **version_str)
Definition version.c:44
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition getopt_long.c:60
#define no_argument
Definition getopt_long.h:25
#define required_argument
Definition getopt_long.h:26
const char * str
long val
Definition informix.c:689
int i
Definition isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
#define PQclear
#define PQresultStatus
#define PQntuples
@ CONNECTION_OK
Definition libpq-fe.h:90
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
@ PGRES_TUPLES_OK
Definition libpq-fe.h:134
void pg_logging_increase_verbosity(void)
Definition logging.c:185
void pg_logging_init(const char *argv0)
Definition logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition logging.c:176
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition logging.c:219
pg_log_part
Definition logging.h:62
@ PG_LOG_PRIMARY
Definition logging.h:67
@ PG_LOG_HINT
Definition logging.h:79
@ PG_LOG_DETAIL
Definition logging.h:73
pg_log_level
Definition logging.h:17
@ PG_LOG_INFO
Definition logging.h:33
@ PG_LOG_DEBUG
Definition logging.h:26
@ PG_LOG_WARNING
Definition logging.h:38
@ PG_LOG_ERROR
Definition logging.h:43
static char * errmsg
static PQExpBuffer recoveryconfcontents
#define MAXPGPATH
static void static pg_noreturn void report_createsub_fatal(const char *pg_restrict fmt,...) pg_attribute_printf(1
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
#define WAIT_INTERVAL
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
#define INCLUDED_CONF_FILE
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static void report_createsub_log(enum pg_log_level, enum pg_log_part, const char *pg_restrict fmt,...) pg_attribute_printf(3
static struct LogicalRepInfos dbinfos
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool success
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static bool dry_run
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
#define OBJECTTYPE_PUBLICATIONS
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void usage(void)
#define INCLUDED_CONF_FILE_DISABLED
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static bool recovery_params_set
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
static char * argv0
Definition pg_ctl.c:94
static char * exec_path
Definition pg_ctl.c:89
PGDLLIMPORT int optind
Definition getopt.c:51
PGDLLIMPORT char * optarg
Definition getopt.c:53
uint32 pg_prng_uint32(pg_prng_state *state)
Definition pg_prng.c:227
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition pg_prng.c:89
char * datadir
NameData subname
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pg_strcasecmp(const char *s1, const char *s2)
const char * pg_strsignal(int signum)
Definition pgstrsignal.c:39
void canonicalize_path(char *path)
Definition path.c:337
#define snprintf
Definition port.h:260
#define DEVNULL
Definition port.h:161
const char * get_progname(const char *argv0)
Definition path.c:652
#define printf(...)
Definition port.h:266
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
#define InvalidOid
unsigned int Oid
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
char * c
static int fd(const char *x, int i)
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
void get_restricted_token(void)
void pg_usleep(long microsec)
Definition signal.c:53
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition simple_list.c:63
char * dbname
Definition streamutil.c:49
PGconn * conn
Definition streamutil.c:52
void appendShellString(PQExpBuffer buf, const char *str)
void appendConnStrVal(PQExpBuffer buf, const char *str)
SimpleStringList database_names
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
struct LogicalRepInfo * dbinfo
SimpleStringListCell * head
Definition simple_list.h:42
#define GET_PG_MAJORVERSION_NUM(v)
Definition version.h:19
char * wait_result_to_str(int exitstatus)
Definition wait_error.c:33
#define stat
Definition win32_port.h:74
#define WIFEXITED(w)
Definition win32_port.h:150
#define WIFSIGNALED(w)
Definition win32_port.h:151
#define WTERMSIG(w)
Definition win32_port.h:153
#define WEXITSTATUS(w)
Definition win32_port.h:152
int gettimeofday(struct timeval *tp, void *tzp)
int wal_level
Definition xlog.c:135
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28