PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_createsubscriber.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_createsubscriber.c
4 * Create a new logical replica from a standby server
5 *
6 * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_createsubscriber.c
10 *
11 *-------------------------------------------------------------------------
12 */
13
14#include "postgres_fe.h"
15
16#include <sys/stat.h>
17#include <sys/time.h>
18#include <sys/wait.h>
19#include <time.h>
20
21#include "common/connect.h"
23#include "common/file_perm.h"
24#include "common/file_utils.h"
25#include "common/logging.h"
26#include "common/pg_prng.h"
28#include "datatype/timestamp.h"
32#include "fe_utils/version.h"
33#include "getopt_long.h"
34
35#define DEFAULT_SUB_PORT "50432"
36#define OBJECTTYPE_PUBLICATIONS 0x0001
37
38/*
39 * Configuration files for recovery parameters.
40 *
41 * The recovery parameters are set in INCLUDED_CONF_FILE, itself loaded by
42 * the server through an include_if_exists in postgresql.auto.conf.
43 *
44 * INCLUDED_CONF_FILE is renamed to INCLUDED_CONF_FILE_DISABLED when exiting,
45 * so as the recovery parameters set by this tool never take effect on node
46 * restart. The contents of INCLUDED_CONF_FILE_DISABLED can be useful for
47 * debugging.
48 */
49#define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
50#define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
51#define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
52
53#define SERVER_LOG_FILE_NAME "pg_createsubscriber_server.log"
54#define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal.log"
55
56/* Command-line options */
58{
59 char *config_file; /* configuration file */
60 char *log_dir; /* log directory name */
61 char *pub_conninfo_str; /* publisher connection string */
62 char *socket_dir; /* directory for Unix-domain socket, if any */
63 char *sub_port; /* subscriber port number */
64 const char *sub_username; /* subscriber username */
65 bool two_phase; /* enable-two-phase option */
66 SimpleStringList database_names; /* list of database names */
67 SimpleStringList pub_names; /* list of publication names */
68 SimpleStringList sub_names; /* list of subscription names */
69 SimpleStringList replslot_names; /* list of replication slot names */
70 int recovery_timeout; /* stop recovery after this time */
71 bool all_dbs; /* all option */
72 SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
73};
74
75/* per-database publication/subscription info */
77{
78 char *dbname; /* database name */
79 char *pubconninfo; /* publisher connection string */
80 char *subconninfo; /* subscriber connection string */
81 char *pubname; /* publication name */
82 char *subname; /* subscription name */
83 char *replslotname; /* replication slot name */
84
85 bool made_replslot; /* replication slot was created */
86 bool made_publication; /* publication was created */
87};
88
89/*
90 * Information shared across all the databases (or publications and
91 * subscriptions).
92 */
94{
96 bool two_phase; /* enable-two-phase option */
97 uint32 objecttypes_to_clean; /* flags indicating which object types
98 * to clean up on subscriber */
99};
100
101static void cleanup_objects_atexit(void);
102static void usage(void);
103static char *get_base_conninfo(const char *conninfo, char **dbname);
104static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
105static char *get_exec_path(const char *argv0, const char *progname);
106static void check_data_directory(const char *datadir);
107static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
108static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
109 const char *pub_base_conninfo,
110 const char *sub_base_conninfo);
111static PGconn *connect_database(const char *conninfo, bool exit_on_error);
112static void disconnect_database(PGconn *conn, bool exit_on_error);
113static uint64 get_primary_sysid(const char *conninfo);
114static uint64 get_standby_sysid(const char *datadir);
115static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
116static bool server_is_in_recovery(PGconn *conn);
117static char *generate_object_name(PGconn *conn);
118static void check_publisher(const struct LogicalRepInfo *dbinfo);
119static char *setup_publisher(struct LogicalRepInfo *dbinfo);
120static void check_subscriber(const struct LogicalRepInfo *dbinfo);
121static void setup_subscriber(struct LogicalRepInfo *dbinfo,
122 const char *consistent_lsn);
123static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
124 const char *lsn);
125static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
126 const char *slotname);
127static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
129 struct LogicalRepInfo *dbinfo);
130static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
131 const char *slot_name);
132static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
133static void start_standby_server(const struct CreateSubscriberOptions *opt,
136static void stop_standby_server(const char *datadir);
137static void wait_for_end_recovery(const char *conninfo,
138 const struct CreateSubscriberOptions *opt);
139static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
140static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
141static void drop_publication(PGconn *conn, const char *pubname,
142 const char *dbname, bool *made_publication);
143static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
144static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
145static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
146 const char *lsn);
147static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
149 const struct LogicalRepInfo *dbinfo);
150static void drop_existing_subscription(PGconn *conn, const char *subname,
151 const char *dbname);
153 bool dbnamespecified);
154
155#define WAIT_INTERVAL 1 /* 1 second */
156
157static const char *progname;
158
159static char *primary_slot_name = NULL;
160static bool dry_run = false;
161
162static bool success = false;
163
165static int num_dbs = 0; /* number of specified databases */
166static int num_pubs = 0; /* number of specified publications */
167static int num_subs = 0; /* number of specified subscriptions */
168static int num_replslots = 0; /* number of specified replication slots */
169
171
172static char *pg_ctl_path = NULL;
173static char *pg_resetwal_path = NULL;
174
175static char *logdir = NULL; /* Subdirectory of the user specified logdir
176 * where the log files are written (if
177 * specified) */
178
179/* standby / subscriber data directory */
180static char *subscriber_dir = NULL;
181
182static bool recovery_ended = false;
183static bool standby_running = false;
184static bool recovery_params_set = false;
185
186
187/*
188 * Clean up objects created by pg_createsubscriber.
189 *
190 * Publications and replication slots are created on the primary. Depending
191 * on the step where it failed, already-created objects should be removed if
192 * possible (sometimes this won't work due to a connection issue).
193 * There is no cleanup on the target server *after* its promotion, because any
194 * failure at this point means recreating the physical replica and starting
195 * again.
196 *
197 * The recovery configuration is always removed, by renaming the included
198 * configuration file out of the way.
199 */
200static void
202{
203 /* Rename the included configuration file, if necessary. */
205 {
208
213
215 {
216 /* durable_rename() has already logged something. */
217 pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
218 }
219 }
220
221 if (success)
222 return;
223
224 /*
225 * If the server is promoted, there is no way to use the current setup
226 * again. Warn the user that a new replication setup should be done before
227 * trying again.
228 */
229 if (recovery_ended)
230 {
231 pg_log_warning("failed after the end of recovery");
232 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
233 "You must recreate the physical replica before continuing.");
234 }
235
236 for (int i = 0; i < num_dbs; i++)
237 {
238 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
239
240 if (dbinfo->made_publication || dbinfo->made_replslot)
241 {
242 PGconn *conn;
243
244 conn = connect_database(dbinfo->pubconninfo, false);
245 if (conn != NULL)
246 {
247 if (dbinfo->made_publication)
248 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
249 &dbinfo->made_publication);
250 if (dbinfo->made_replslot)
251 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
253 }
254 else
255 {
256 /*
257 * If a connection could not be established, inform the user
258 * that some objects were left on primary and should be
259 * removed before trying again.
260 */
261 if (dbinfo->made_publication)
262 {
263 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
264 dbinfo->pubname,
265 dbinfo->dbname);
266 pg_log_warning_hint("Drop this publication before trying again.");
267 }
268 if (dbinfo->made_replslot)
269 {
270 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
271 dbinfo->replslotname,
272 dbinfo->dbname);
273 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
274 }
275 }
276 }
277 }
278
279 if (standby_running)
281}
282
283static void
284usage(void)
285{
286 printf(_("%s creates a new logical replica from a standby server.\n\n"),
287 progname);
288 printf(_("Usage:\n"));
289 printf(_(" %s [OPTION]...\n"), progname);
290 printf(_("\nOptions:\n"));
291 printf(_(" -a, --all create subscriptions for all databases except template\n"
292 " databases and databases that don't allow connections\n"));
293 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
294 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
295 printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
296 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
297 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
298 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
299 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
300 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
301 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
302 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
303 printf(_(" -v, --verbose output verbose messages\n"));
304 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
305 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
306 printf(_(" --config-file=FILENAME use specified main server configuration\n"
307 " file when running target cluster\n"));
308 printf(_(" --publication=NAME publication name\n"));
309 printf(_(" --replication-slot=NAME replication slot name\n"));
310 printf(_(" --subscription=NAME subscription name\n"));
311 printf(_(" -V, --version output version information, then exit\n"));
312 printf(_(" -?, --help show this help, then exit\n"));
313 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
314 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
315}
316
317/*
318 * Subroutine to append "keyword=value" to a connection string,
319 * with proper quoting of the value. (We assume keywords don't need that.)
320 */
321static void
322appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
323{
324 if (buf->len > 0)
326 appendPQExpBufferStr(buf, keyword);
329}
330
331/*
332 * Validate a connection string. Returns a base connection string that is a
333 * connection string without a database name.
334 *
335 * Since we might process multiple databases, each database name will be
336 * appended to this base connection string to provide a final connection
337 * string. If the second argument (dbname) is not null, returns dbname if the
338 * provided connection string contains it.
339 *
340 * It is the caller's responsibility to free the returned connection string and
341 * dbname.
342 */
343static char *
344get_base_conninfo(const char *conninfo, char **dbname)
345{
349 char *errmsg = NULL;
350 char *ret;
351
352 conn_opts = PQconninfoParse(conninfo, &errmsg);
353 if (conn_opts == NULL)
354 {
355 pg_log_error("could not parse connection string: %s", errmsg);
357 return NULL;
358 }
359
361 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
362 {
363 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
364 {
365 if (strcmp(conn_opt->keyword, "dbname") == 0)
366 {
367 if (dbname)
368 *dbname = pg_strdup(conn_opt->val);
369 continue;
370 }
371 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
372 }
373 }
374
375 ret = pg_strdup(buf->data);
376
379
380 return ret;
381}
382
383/*
384 * Build a subscriber connection string. Only a few parameters are supported
385 * since it starts a server with restricted access.
386 */
387static char *
389{
391 char *ret;
392
393 appendConnStrItem(buf, "port", opt->sub_port);
394#if !defined(WIN32)
395 appendConnStrItem(buf, "host", opt->socket_dir);
396#endif
397 if (opt->sub_username != NULL)
398 appendConnStrItem(buf, "user", opt->sub_username);
399 appendConnStrItem(buf, "fallback_application_name", progname);
400
401 ret = pg_strdup(buf->data);
402
404
405 return ret;
406}
407
408/*
409 * Verify if a PostgreSQL binary (progname) is available in the same directory as
410 * pg_createsubscriber and it has the same version. It returns the absolute
411 * path of the progname.
412 */
413static char *
414get_exec_path(const char *argv0, const char *progname)
415{
416 char *versionstr;
417 char *exec_path;
418 int ret;
419
420 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
423
424 if (ret < 0)
425 {
426 char full_path[MAXPGPATH];
427
428 if (find_my_exec(argv0, full_path) < 0)
430
431 if (ret == -1)
432 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
433 progname, "pg_createsubscriber", full_path);
434 else
435 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
436 progname, full_path, "pg_createsubscriber");
437 }
438
439 pg_log_debug("%s path is: %s", progname, exec_path);
440
441 return exec_path;
442}
443
444/*
445 * Is it a cluster directory? These are preliminary checks. It is far from
446 * making an accurate check. If it is not a clone from the publisher, it will
447 * eventually fail in a future step.
448 */
449static void
451{
452 struct stat statbuf;
453 uint32 major_version;
454 char *version_str;
455
456 pg_log_info("checking if directory \"%s\" is a cluster data directory",
457 datadir);
458
459 if (stat(datadir, &statbuf) != 0)
460 {
461 if (errno == ENOENT)
462 pg_fatal("data directory \"%s\" does not exist", datadir);
463 else
464 pg_fatal("could not access directory \"%s\": %m", datadir);
465 }
466
467 /*
468 * Retrieve the contents of this cluster's PG_VERSION. We require
469 * compatibility with the same major version as the one this tool is
470 * compiled with.
471 */
473 if (major_version != PG_MAJORVERSION_NUM)
474 {
475 pg_log_error("data directory is of wrong version");
476 pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
477 "PG_VERSION", version_str, PG_MAJORVERSION);
478 exit(1);
479 }
480}
481
482/*
483 * Append database name into a base connection string.
484 *
485 * dbname is the only parameter that changes so it is not included in the base
486 * connection string. This function concatenates dbname to build a "real"
487 * connection string.
488 */
489static char *
490concat_conninfo_dbname(const char *conninfo, const char *dbname)
491{
493 char *ret;
494
495 Assert(conninfo != NULL);
496
497 appendPQExpBufferStr(buf, conninfo);
498 appendConnStrItem(buf, "dbname", dbname);
499
500 ret = pg_strdup(buf->data);
502
503 return ret;
504}
505
506/*
507 * Store publication and subscription information.
508 *
509 * If publication, replication slot and subscription names were specified,
510 * store it here. Otherwise, a generated name will be assigned to the object in
511 * setup_publisher().
512 */
513static struct LogicalRepInfo *
515 const char *pub_base_conninfo,
516 const char *sub_base_conninfo)
517{
518 struct LogicalRepInfo *dbinfo;
522 int i = 0;
523
524 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
525
526 if (num_pubs > 0)
527 pubcell = opt->pub_names.head;
528 if (num_subs > 0)
529 subcell = opt->sub_names.head;
530 if (num_replslots > 0)
532
533 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
534 {
535 char *conninfo;
536
537 /* Fill publisher attributes */
538 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
539 dbinfo[i].pubconninfo = conninfo;
540 dbinfo[i].dbname = cell->val;
541 if (num_pubs > 0)
542 dbinfo[i].pubname = pubcell->val;
543 else
544 dbinfo[i].pubname = NULL;
545 if (num_replslots > 0)
546 dbinfo[i].replslotname = replslotcell->val;
547 else
548 dbinfo[i].replslotname = NULL;
549 dbinfo[i].made_replslot = false;
550 dbinfo[i].made_publication = false;
551 /* Fill subscriber attributes */
552 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
553 dbinfo[i].subconninfo = conninfo;
554 if (num_subs > 0)
555 dbinfo[i].subname = subcell->val;
556 else
557 dbinfo[i].subname = NULL;
558 /* Other fields will be filled later */
559
560 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
561 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
562 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
563 dbinfo[i].pubconninfo);
564 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
565 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
566 dbinfo[i].subconninfo,
567 dbinfos.two_phase ? "true" : "false");
568
569 if (num_pubs > 0)
570 pubcell = pubcell->next;
571 if (num_subs > 0)
572 subcell = subcell->next;
573 if (num_replslots > 0)
575
576 i++;
577 }
578
579 return dbinfo;
580}
581
582/*
583 * Open a new connection. If exit_on_error is true, it has an undesired
584 * condition and it should exit immediately.
585 */
586static PGconn *
587connect_database(const char *conninfo, bool exit_on_error)
588{
589 PGconn *conn;
590 PGresult *res;
591
592 conn = PQconnectdb(conninfo);
594 {
595 pg_log_error("connection to database failed: %s",
597 PQfinish(conn);
598
599 if (exit_on_error)
600 exit(1);
601 return NULL;
602 }
603
604 /* Secure search_path */
607 {
608 pg_log_error("could not clear \"search_path\": %s",
610 PQclear(res);
611 PQfinish(conn);
612
613 if (exit_on_error)
614 exit(1);
615 return NULL;
616 }
617 PQclear(res);
618
619 return conn;
620}
621
622/*
623 * Close the connection. If exit_on_error is true, it has an undesired
624 * condition and it should exit immediately.
625 */
626static void
627disconnect_database(PGconn *conn, bool exit_on_error)
628{
629 Assert(conn != NULL);
630
631 PQfinish(conn);
632
633 if (exit_on_error)
634 exit(1);
635}
636
637/*
638 * Obtain the system identifier using the provided connection. It will be used
639 * to compare if a data directory is a clone of another one.
640 */
641static uint64
642get_primary_sysid(const char *conninfo)
643{
644 PGconn *conn;
645 PGresult *res;
647
648 pg_log_info("getting system identifier from publisher");
649
650 conn = connect_database(conninfo, true);
651
652 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
654 {
655 pg_log_error("could not get system identifier: %s",
658 }
659 if (PQntuples(res) != 1)
660 {
661 pg_log_error("could not get system identifier: got %d rows, expected %d row",
662 PQntuples(res), 1);
664 }
665
666 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
667
668 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
669
670 PQclear(res);
672
673 return sysid;
674}
675
676/*
677 * Obtain the system identifier from control file. It will be used to compare
678 * if a data directory is a clone of another one. This routine is used locally
679 * and avoids a connection.
680 */
681static uint64
683{
685 bool crc_ok;
687
688 pg_log_info("getting system identifier from subscriber");
689
691 if (!crc_ok)
692 pg_fatal("control file appears to be corrupt");
693
694 sysid = cf->system_identifier;
695
696 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
697
698 pg_free(cf);
699
700 return sysid;
701}
702
703/*
704 * Modify the system identifier. Since a standby server preserves the system
705 * identifier, it makes sense to change it to avoid situations in which WAL
706 * files from one of the systems might be used in the other one.
707 */
708static void
710{
712 bool crc_ok;
713 struct timeval tv;
714
715 char *out_file;
716 char *cmd_str;
717
718 pg_log_info("modifying system identifier of subscriber");
719
721 if (!crc_ok)
722 pg_fatal("control file appears to be corrupt");
723
724 /*
725 * Select a new system identifier.
726 *
727 * XXX this code was extracted from BootStrapXLOG().
728 */
729 gettimeofday(&tv, NULL);
730 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
731 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
732 cf->system_identifier |= getpid() & 0xFFF;
733
734 if (dry_run)
735 pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
736 cf->system_identifier);
737 else
738 {
740 pg_log_info("system identifier is %" PRIu64 " on subscriber",
741 cf->system_identifier);
742 }
743
744 if (dry_run)
745 pg_log_info("dry-run: would run pg_resetwal on the subscriber");
746 else
747 pg_log_info("running pg_resetwal on the subscriber");
748
749 /*
750 * Redirecting the output to the logfile if specified. Since the output
751 * would be very short, around one line, we do not provide a separate file
752 * for it; it's done as a part of the server log.
753 */
754 if (opt->log_dir)
756 else
758
759 cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
761 if (opt->log_dir)
763
764 pg_log_debug("pg_resetwal command is: %s", cmd_str);
765
766 if (!dry_run)
767 {
768 int rc = system(cmd_str);
769
770 if (rc == 0)
771 pg_log_info("successfully reset WAL on the subscriber");
772 else
773 pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
774 }
775
776 pg_free(cf);
778}
779
780/*
781 * Generate an object name using a prefix, database oid and a random integer.
782 * It is used in case the user does not specify an object name (publication,
783 * subscription, replication slot).
784 */
785static char *
787{
788 PGresult *res;
789 Oid oid;
790 uint32 rand;
791 char *objname;
792
793 res = PQexec(conn,
794 "SELECT oid FROM pg_catalog.pg_database "
795 "WHERE datname = pg_catalog.current_database()");
797 {
798 pg_log_error("could not obtain database OID: %s",
801 }
802
803 if (PQntuples(res) != 1)
804 {
805 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
806 PQntuples(res), 1);
808 }
809
810 /* Database OID */
811 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
812
813 PQclear(res);
814
815 /* Random unsigned integer */
817
818 /*
819 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
820 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
821 * '\0').
822 */
823 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
824
825 return objname;
826}
827
828/*
829 * Does the publication exist in the specified database?
830 */
831static bool
832find_publication(PGconn *conn, const char *pubname, const char *dbname)
833{
835 PGresult *res;
836 bool found = false;
837 char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
838
840 "SELECT 1 FROM pg_catalog.pg_publication "
841 "WHERE pubname = %s",
843 res = PQexec(conn, str->data);
845 {
846 pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
847 pubname, dbname, PQerrorMessage(conn));
849 }
850
851 if (PQntuples(res) == 1)
852 found = true;
853
854 PQclear(res);
857
858 return found;
859}
860
861/*
862 * Create the publications and replication slots in preparation for logical
863 * replication. Returns the LSN from latest replication slot. It will be the
864 * replication start point that is used to adjust the subscriptions (see
865 * set_replication_progress).
866 */
867static char *
869{
870 char *lsn = NULL;
871
872 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
873
874 for (int i = 0; i < num_dbs; i++)
875 {
876 PGconn *conn;
877 char *genname = NULL;
878
879 conn = connect_database(dbinfo[i].pubconninfo, true);
880
881 /*
882 * If an object name was not specified as command-line options, assign
883 * a generated object name. The replication slot has a different rule.
884 * The subscription name is assigned to the replication slot name if
885 * no replication slot is specified. It follows the same rule as
886 * CREATE SUBSCRIPTION.
887 */
888 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
890 if (num_pubs == 0)
891 dbinfo[i].pubname = pg_strdup(genname);
892 if (num_subs == 0)
893 dbinfo[i].subname = pg_strdup(genname);
894 if (num_replslots == 0)
895 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
896
897 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
898 {
899 /* Reuse existing publication on publisher. */
900 pg_log_info("use existing publication \"%s\" in database \"%s\"",
901 dbinfo[i].pubname, dbinfo[i].dbname);
902 /* Don't remove pre-existing publication if an error occurs. */
903 dbinfo[i].made_publication = false;
904 }
905 else
906 {
907 /*
908 * Create publication on publisher. This step should be executed
909 * *before* promoting the subscriber to avoid any transactions
910 * between consistent LSN and the new publication rows (such
911 * transactions wouldn't see the new publication rows resulting in
912 * an error).
913 */
914 create_publication(conn, &dbinfo[i]);
915 }
916
917 /* Create replication slot on publisher */
918 if (lsn)
919 pg_free(lsn);
920 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
921 if (lsn == NULL && !dry_run)
922 exit(1);
923
924 /*
925 * Since we are using the LSN returned by the last replication slot as
926 * recovery_target_lsn, this LSN is ahead of the current WAL position
927 * and the recovery waits until the publisher writes a WAL record to
928 * reach the target and ends the recovery. On idle systems, this wait
929 * time is unpredictable and could lead to failure in promoting the
930 * subscriber. To avoid that, insert a harmless WAL record.
931 */
932 if (i == num_dbs - 1 && !dry_run)
933 {
934 PGresult *res;
935
936 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
938 {
939 pg_log_error("could not write an additional WAL record: %s",
942 }
943 PQclear(res);
944 }
945
947 }
948
949 return lsn;
950}
951
952/*
953 * Is recovery still in progress?
954 */
955static bool
957{
958 PGresult *res;
959 int ret;
960
961 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
962
964 {
965 pg_log_error("could not obtain recovery progress: %s",
968 }
969
970
971 ret = strcmp("t", PQgetvalue(res, 0, 0));
972
973 PQclear(res);
974
975 return ret == 0;
976}
977
978static void
980{
981 char timestamp[128];
982 struct timeval tval;
983 time_t now;
984 struct tm tmbuf;
985
986 /* Generate timestamp */
988 now = tval.tv_sec;
989
990 strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
991 localtime_r(&now, &tmbuf));
992
993 /* Append milliseconds */
995 sizeof(timestamp) - strlen(timestamp), ".%03u",
996 (unsigned int) (tval.tv_usec / 1000));
997
998 /* Build timestamp directory path */
1000
1001 /* Create base directory (ignore if exists) */
1003 pg_fatal("could not create directory \"%s\": %m", log_basedir);
1004
1005 /* Create a timestamp-named subdirectory under the base directory */
1007 pg_fatal("could not create directory \"%s\": %m", logdir);
1008}
1009
1010/*
1011 * Is the primary server ready for logical replication?
1012 *
1013 * XXX Does it not allow a synchronous replica?
1014 */
1015static void
1016check_publisher(const struct LogicalRepInfo *dbinfo)
1017{
1018 PGconn *conn;
1019 PGresult *res;
1020 bool failed = false;
1021
1022 char *wal_level;
1023 int max_repslots;
1024 int cur_repslots;
1025 int max_walsenders;
1026 int cur_walsenders;
1029
1030 pg_log_info("checking settings on publisher");
1031
1032 conn = connect_database(dbinfo[0].pubconninfo, true);
1033
1034 /*
1035 * If the primary server is in recovery (i.e. cascading replication),
1036 * objects (publication) cannot be created because it is read only.
1037 */
1039 {
1040 pg_log_error("primary server cannot be in recovery");
1042 }
1043
1044 /*------------------------------------------------------------------------
1045 * Logical replication requires a few parameters to be set on publisher.
1046 * Since these parameters are not a requirement for physical replication,
1047 * we should check it to make sure it won't fail.
1048 *
1049 * - wal_level >= replica
1050 * - max_replication_slots >= current + number of dbs to be converted
1051 * - max_wal_senders >= current + number of dbs to be converted
1052 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1053 * -----------------------------------------------------------------------
1054 */
1055 res = PQexec(conn,
1056 "SELECT pg_catalog.current_setting('wal_level'),"
1057 " pg_catalog.current_setting('max_replication_slots'),"
1058 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1059 " pg_catalog.current_setting('max_wal_senders'),"
1060 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1061 " pg_catalog.current_setting('max_prepared_transactions'),"
1062 " pg_catalog.current_setting('max_slot_wal_keep_size')");
1063
1064 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1065 {
1066 pg_log_error("could not obtain publisher settings: %s",
1069 }
1070
1071 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1072 max_repslots = atoi(PQgetvalue(res, 0, 1));
1073 cur_repslots = atoi(PQgetvalue(res, 0, 2));
1074 max_walsenders = atoi(PQgetvalue(res, 0, 3));
1075 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1078
1079 PQclear(res);
1080
1081 pg_log_debug("publisher: wal_level: %s", wal_level);
1082 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
1083 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
1084 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
1085 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
1086 pg_log_debug("publisher: max_prepared_transactions: %d",
1088 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
1090
1091 disconnect_database(conn, false);
1092
1093 if (strcmp(wal_level, "minimal") == 0)
1094 {
1095 pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
1096 failed = true;
1097 }
1098
1100 {
1101 pg_log_error("publisher requires %d replication slots, but only %d remain",
1103 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1104 "max_replication_slots", cur_repslots + num_dbs);
1105 failed = true;
1106 }
1107
1109 {
1110 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1112 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1113 "max_wal_senders", cur_walsenders + num_dbs);
1114 failed = true;
1115 }
1116
1118 {
1119 pg_log_warning("two_phase option will not be enabled for replication slots");
1120 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1121 "Prepared transactions will be replicated at COMMIT PREPARED.");
1122 pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1123 }
1124
1125 /*
1126 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1127 * is set to a non-default value, it may cause replication failures due to
1128 * required WAL files being prematurely removed.
1129 */
1130 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1131 {
1132 pg_log_warning("required WAL could be removed from the publisher");
1133 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1134 "max_slot_wal_keep_size");
1135 }
1136
1138
1139 if (failed)
1140 exit(1);
1141}
1142
1143/*
1144 * Is the standby server ready for logical replication?
1145 *
1146 * XXX Does it not allow a time-delayed replica?
1147 *
1148 * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1149 * is S, it cannot detect there is a replica (server C) because server S starts
1150 * accepting only local connections and server C cannot connect to it. Hence,
1151 * there is not a reliable way to provide a suitable error saying the server C
1152 * will be broken at the end of this process (due to pg_resetwal).
1153 */
1154static void
1156{
1157 PGconn *conn;
1158 PGresult *res;
1159 bool failed = false;
1160
1161 int max_lrworkers;
1162 int max_replorigins;
1163 int max_wprocs;
1164
1165 pg_log_info("checking settings on subscriber");
1166
1167 conn = connect_database(dbinfo[0].subconninfo, true);
1168
1169 /* The target server must be a standby */
1171 {
1172 pg_log_error("target server must be a standby");
1174 }
1175
1176 /*------------------------------------------------------------------------
1177 * Logical replication requires a few parameters to be set on subscriber.
1178 * Since these parameters are not a requirement for physical replication,
1179 * we should check it to make sure it won't fail.
1180 *
1181 * - max_active_replication_origins >= number of dbs to be converted
1182 * - max_logical_replication_workers >= number of dbs to be converted
1183 * - max_worker_processes >= 1 + number of dbs to be converted
1184 *------------------------------------------------------------------------
1185 */
1186 res = PQexec(conn,
1187 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1188 "'max_logical_replication_workers', "
1189 "'max_active_replication_origins', "
1190 "'max_worker_processes', "
1191 "'primary_slot_name') "
1192 "ORDER BY name");
1193
1194 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1195 {
1196 pg_log_error("could not obtain subscriber settings: %s",
1199 }
1200
1201 max_replorigins = atoi(PQgetvalue(res, 0, 0));
1202 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1203 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1204 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1206
1207 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1209 pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
1210 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1212 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1213
1214 PQclear(res);
1215
1216 disconnect_database(conn, false);
1217
1219 {
1220 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1222 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1223 "max_active_replication_origins", num_dbs);
1224 failed = true;
1225 }
1226
1227 if (max_lrworkers < num_dbs)
1228 {
1229 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1231 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1232 "max_logical_replication_workers", num_dbs);
1233 failed = true;
1234 }
1235
1236 if (max_wprocs < num_dbs + 1)
1237 {
1238 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1239 num_dbs + 1, max_wprocs);
1240 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1241 "max_worker_processes", num_dbs + 1);
1242 failed = true;
1243 }
1244
1245 if (failed)
1246 exit(1);
1247}
1248
1249/*
1250 * Drop a specified subscription. This is to avoid duplicate subscriptions on
1251 * the primary (publisher node) and the newly created subscriber. We
1252 * shouldn't drop the associated slot as that would be used by the publisher
1253 * node.
1254 */
1255static void
1257{
1259 PGresult *res;
1260
1261 Assert(conn != NULL);
1262
1263 /*
1264 * Construct a query string. These commands are allowed to be executed
1265 * within a transaction.
1266 */
1267 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1268 subname);
1269 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1270 subname);
1271 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1272
1273 if (dry_run)
1274 pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1275 subname, dbname);
1276 else
1277 {
1278 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1279 subname, dbname);
1280
1281 res = PQexec(conn, query->data);
1282
1283 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1284 {
1285 pg_log_error("could not drop subscription \"%s\": %s",
1288 }
1289
1290 PQclear(res);
1291 }
1292
1293 destroyPQExpBuffer(query);
1294}
1295
1296/*
1297 * Retrieve and drop the pre-existing subscriptions.
1298 */
1299static void
1301 const struct LogicalRepInfo *dbinfo)
1302{
1304 char *dbname;
1305 PGresult *res;
1306
1307 Assert(conn != NULL);
1308
1309 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1310
1311 appendPQExpBuffer(query,
1312 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1313 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1314 "WHERE d.datname = %s",
1315 dbname);
1316 res = PQexec(conn, query->data);
1317
1318 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1319 {
1320 pg_log_error("could not obtain pre-existing subscriptions: %s",
1323 }
1324
1325 for (int i = 0; i < PQntuples(res); i++)
1327 dbinfo->dbname);
1328
1329 PQclear(res);
1330 destroyPQExpBuffer(query);
1332}
1333
1334/*
1335 * Create the subscriptions, adjust the initial location for logical
1336 * replication and enable the subscriptions. That's the last step for logical
1337 * replication setup.
1338 */
1339static void
1341{
1342 for (int i = 0; i < num_dbs; i++)
1343 {
1344 PGconn *conn;
1345
1346 /* Connect to subscriber. */
1347 conn = connect_database(dbinfo[i].subconninfo, true);
1348
1349 /*
1350 * We don't need the pre-existing subscriptions on the newly formed
1351 * subscriber. They can connect to other publisher nodes and either
1352 * get some unwarranted data or can lead to ERRORs in connecting to
1353 * such nodes.
1354 */
1356
1357 /* Check and drop the required publications in the given database. */
1359
1360 create_subscription(conn, &dbinfo[i]);
1361
1362 /* Set the replication progress to the correct LSN */
1364
1365 /* Enable subscription */
1366 enable_subscription(conn, &dbinfo[i]);
1367
1368 disconnect_database(conn, false);
1369 }
1370}
1371
1372/*
1373 * Write the required recovery parameters.
1374 */
1375static void
1376setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1377{
1378 PGconn *conn;
1380
1381 /*
1382 * Despite of the recovery parameters will be written to the subscriber,
1383 * use a publisher connection. The primary_conninfo is generated using the
1384 * connection settings.
1385 */
1386 conn = connect_database(dbinfo[0].pubconninfo, true);
1387
1388 /*
1389 * Write recovery parameters.
1390 *
1391 * The subscriber is not running yet. In dry run mode, the recovery
1392 * parameters *won't* be written. An invalid LSN is used for printing
1393 * purposes. Additional recovery parameters are added here. It avoids
1394 * unexpected behavior such as end of recovery as soon as a consistent
1395 * state is reached (recovery_target) and failure due to multiple recovery
1396 * targets (name, time, xid, LSN).
1397 */
1399 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1401 "recovery_target_timeline = 'latest'\n");
1402
1403 /*
1404 * Set recovery_target_inclusive = false to avoid reapplying the
1405 * transaction committed at 'lsn' after subscription is enabled. This is
1406 * because the provided 'lsn' is also used as the replication start point
1407 * for the subscription. So, the server can send the transaction committed
1408 * at that 'lsn' after replication is started which can lead to applying
1409 * the same transaction twice if we keep recovery_target_inclusive = true.
1410 */
1412 "recovery_target_inclusive = false\n");
1414 "recovery_target_action = promote\n");
1415 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1416 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1417 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1418
1419 if (dry_run)
1420 {
1421 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1423 "recovery_target_lsn = '%X/%08X'\n",
1425 }
1426 else
1427 {
1428 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1429 lsn);
1430 }
1431
1432 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1433
1434 if (!dry_run)
1435 {
1437 FILE *fd;
1438
1439 /* Write the recovery parameters to INCLUDED_CONF_FILE */
1442 fd = fopen(conf_filename, "w");
1443 if (fd == NULL)
1444 pg_fatal("could not open file \"%s\": %m", conf_filename);
1445
1447 pg_fatal("could not write to file \"%s\": %m", conf_filename);
1448
1449 fclose(fd);
1450 recovery_params_set = true;
1451
1452 /* Include conditionally the recovery parameters. */
1455 "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1457 }
1458
1459 disconnect_database(conn, false);
1460}
1461
1462/*
1463 * Drop physical replication slot on primary if the standby was using it. After
1464 * the transformation, it has no use.
1465 *
1466 * XXX we might not fail here. Instead, we provide a warning so the user
1467 * eventually drops this replication slot later.
1468 */
1469static void
1470drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1471{
1472 PGconn *conn;
1473
1474 /* Replication slot does not exist, do nothing */
1475 if (!primary_slot_name)
1476 return;
1477
1478 conn = connect_database(dbinfo[0].pubconninfo, false);
1479 if (conn != NULL)
1480 {
1481 drop_replication_slot(conn, &dbinfo[0], slotname);
1482 disconnect_database(conn, false);
1483 }
1484 else
1485 {
1486 pg_log_warning("could not drop replication slot \"%s\" on primary",
1487 slotname);
1488 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1489 }
1490}
1491
1492/*
1493 * Drop failover replication slots on subscriber. After the transformation,
1494 * they have no use.
1495 *
1496 * XXX We do not fail here. Instead, we provide a warning so the user can drop
1497 * them later.
1498 */
1499static void
1501{
1502 PGconn *conn;
1503 PGresult *res;
1504
1505 conn = connect_database(dbinfo[0].subconninfo, false);
1506 if (conn != NULL)
1507 {
1508 /* Get failover replication slot names */
1509 res = PQexec(conn,
1510 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1511
1512 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1513 {
1514 /* Remove failover replication slots from subscriber */
1515 for (int i = 0; i < PQntuples(res); i++)
1516 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1517 }
1518 else
1519 {
1520 pg_log_warning("could not obtain failover replication slot information: %s",
1522 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1523 }
1524
1525 PQclear(res);
1526 disconnect_database(conn, false);
1527 }
1528 else
1529 {
1530 pg_log_warning("could not drop failover replication slot");
1531 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1532 }
1533}
1534
1535/*
1536 * Create a logical replication slot and returns a LSN.
1537 *
1538 * CreateReplicationSlot() is not used because it does not provide the one-row
1539 * result set that contains the LSN.
1540 */
1541static char *
1543{
1545 PGresult *res = NULL;
1546 const char *slot_name = dbinfo->replslotname;
1547 char *slot_name_esc;
1548 char *lsn = NULL;
1549
1550 Assert(conn != NULL);
1551
1552 if (dry_run)
1553 pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1554 slot_name, dbinfo->dbname);
1555 else
1556 pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1557 slot_name, dbinfo->dbname);
1558
1559 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1560
1562 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1564 dbinfos.two_phase ? "true" : "false");
1565
1567
1568 pg_log_debug("command is: %s", str->data);
1569
1570 if (!dry_run)
1571 {
1572 res = PQexec(conn, str->data);
1573 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1574 {
1575 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1576 slot_name, dbinfo->dbname,
1578 PQclear(res);
1580 return NULL;
1581 }
1582
1583 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1584 PQclear(res);
1585 }
1586
1587 /* For cleanup purposes */
1588 dbinfo->made_replslot = true;
1589
1591
1592 return lsn;
1593}
1594
1595static void
1597 const char *slot_name)
1598{
1600 char *slot_name_esc;
1601 PGresult *res;
1602
1603 Assert(conn != NULL);
1604
1605 if (dry_run)
1606 pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1607 slot_name, dbinfo->dbname);
1608 else
1609 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1610 slot_name, dbinfo->dbname);
1611
1612 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1613
1614 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1615
1617
1618 pg_log_debug("command is: %s", str->data);
1619
1620 if (!dry_run)
1621 {
1622 res = PQexec(conn, str->data);
1623 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1624 {
1625 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1626 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1627 dbinfo->made_replslot = false; /* don't try again. */
1628 }
1629
1630 PQclear(res);
1631 }
1632
1634}
1635
1636/*
1637 * Reports a suitable message if pg_ctl fails.
1638 */
1639static void
1640pg_ctl_status(const char *pg_ctl_cmd, int rc)
1641{
1642 if (rc != 0)
1643 {
1644 if (WIFEXITED(rc))
1645 {
1646 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1647 }
1648 else if (WIFSIGNALED(rc))
1649 {
1650#if defined(WIN32)
1651 pg_log_error("pg_ctl was terminated by exception 0x%X",
1652 WTERMSIG(rc));
1653 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1654#else
1655 pg_log_error("pg_ctl was terminated by signal %d: %s",
1656 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1657#endif
1658 }
1659 else
1660 {
1661 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1662 }
1663
1664 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1665 exit(1);
1666 }
1667}
1668
1669static void
1672{
1674 int rc;
1675
1676 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1678 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1679
1680 /* Prevent unintended slot invalidation */
1681 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1682
1684 {
1685 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1686#if !defined(WIN32)
1687
1688 /*
1689 * An empty listen_addresses list means the server does not listen on
1690 * any IP interfaces; only Unix-domain sockets can be used to connect
1691 * to the server. Prevent external connections to minimize the chance
1692 * of failure.
1693 */
1694 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1695 if (opt->socket_dir)
1696 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1697 opt->socket_dir);
1699#endif
1700 }
1701 if (opt->config_file != NULL)
1702 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1703 opt->config_file);
1704
1705 /* Suppress to start logical replication if requested */
1707 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1708
1709 if (opt->log_dir)
1711
1712 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1713 rc = system(pg_ctl_cmd->data);
1714 pg_ctl_status(pg_ctl_cmd->data, rc);
1715 standby_running = true;
1717 pg_log_info("server was started");
1718}
1719
1720static void
1722{
1723 char *pg_ctl_cmd;
1724 int rc;
1725
1726 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1727 datadir);
1728 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1729 rc = system(pg_ctl_cmd);
1731 standby_running = false;
1732 pg_log_info("server was stopped");
1733}
1734
1735/*
1736 * Returns after the server finishes the recovery process.
1737 *
1738 * If recovery_timeout option is set, terminate abnormally without finishing
1739 * the recovery process. By default, it waits forever.
1740 *
1741 * XXX Is the recovery process still in progress? When recovery process has a
1742 * better progress reporting mechanism, it should be added here.
1743 */
1744static void
1745wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1746{
1747 PGconn *conn;
1748 bool ready = false;
1749 int timer = 0;
1750
1751 pg_log_info("waiting for the target server to reach the consistent state");
1752
1753 conn = connect_database(conninfo, true);
1754
1755 for (;;)
1756 {
1757 /* Did the recovery process finish? We're done if so. */
1759 {
1760 ready = true;
1761 recovery_ended = true;
1762 break;
1763 }
1764
1765 /* Bail out after recovery_timeout seconds if this option is set */
1766 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1767 {
1769 pg_log_error("recovery timed out");
1771 }
1772
1773 /* Keep waiting */
1776 }
1777
1778 disconnect_database(conn, false);
1779
1780 if (!ready)
1781 pg_fatal("server did not end recovery");
1782
1783 pg_log_info("target server reached the consistent state");
1784 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1785}
1786
1787/*
1788 * Create a publication that includes all tables in the database.
1789 */
1790static void
1792{
1794 PGresult *res;
1795 char *ipubname_esc;
1796 char *spubname_esc;
1797
1798 Assert(conn != NULL);
1799
1801 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1802
1803 /* Check if the publication already exists */
1805 "SELECT 1 FROM pg_catalog.pg_publication "
1806 "WHERE pubname = %s",
1807 spubname_esc);
1808 res = PQexec(conn, str->data);
1809 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1810 {
1811 pg_log_error("could not obtain publication information: %s",
1814 }
1815
1816 if (PQntuples(res) == 1)
1817 {
1818 /*
1819 * Unfortunately, if it reaches this code path, it will always fail
1820 * (unless you decide to change the existing publication name). That's
1821 * bad but it is very unlikely that the user will choose a name with
1822 * pg_createsubscriber_ prefix followed by the exact database oid and
1823 * a random number.
1824 */
1825 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1826 pg_log_error_hint("Consider renaming this publication before continuing.");
1828 }
1829
1830 PQclear(res);
1832
1833 if (dry_run)
1834 pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1835 dbinfo->pubname, dbinfo->dbname);
1836 else
1837 pg_log_info("creating publication \"%s\" in database \"%s\"",
1838 dbinfo->pubname, dbinfo->dbname);
1839
1840 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1841 ipubname_esc);
1842
1843 pg_log_debug("command is: %s", str->data);
1844
1845 if (!dry_run)
1846 {
1847 res = PQexec(conn, str->data);
1848 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1849 {
1850 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1851 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1853 }
1854 PQclear(res);
1855 }
1856
1857 /* For cleanup purposes */
1858 dbinfo->made_publication = true;
1859
1863}
1864
1865/*
1866 * Drop the specified publication in the given database.
1867 */
1868static void
1869drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1870 bool *made_publication)
1871{
1873 PGresult *res;
1874 char *pubname_esc;
1875
1876 Assert(conn != NULL);
1877
1878 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1879
1880 if (dry_run)
1881 pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1882 pubname, dbname);
1883 else
1884 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1885 pubname, dbname);
1886
1887 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1888
1890
1891 pg_log_debug("command is: %s", str->data);
1892
1893 if (!dry_run)
1894 {
1895 res = PQexec(conn, str->data);
1896 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1897 {
1898 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1899 pubname, dbname, PQresultErrorMessage(res));
1900 *made_publication = false; /* don't try again. */
1901
1902 /*
1903 * Don't disconnect and exit here. This routine is used by primary
1904 * (cleanup publication / replication slot due to an error) and
1905 * subscriber (remove the replicated publications). In both cases,
1906 * it can continue and provide instructions for the user to remove
1907 * it later if cleanup fails.
1908 */
1909 }
1910 PQclear(res);
1911 }
1912
1914}
1915
1916/*
1917 * Retrieve and drop the publications.
1918 *
1919 * Publications copied during physical replication remain on the subscriber
1920 * after promotion. If --clean=publications is specified, drop all existing
1921 * publications in the subscriber database. Otherwise, only drop publications
1922 * that were created by pg_createsubscriber during this operation.
1923 */
1924static void
1926{
1927 PGresult *res;
1929
1930 Assert(conn != NULL);
1931
1932 if (drop_all_pubs)
1933 {
1934 pg_log_info("dropping all existing publications in database \"%s\"",
1935 dbinfo->dbname);
1936
1937 /* Fetch all publication names */
1938 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1939 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1940 {
1941 pg_log_error("could not obtain publication information: %s",
1943 PQclear(res);
1945 }
1946
1947 /* Drop each publication */
1948 for (int i = 0; i < PQntuples(res); i++)
1949 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1950 &dbinfo->made_publication);
1951
1952 PQclear(res);
1953 }
1954 else
1955 {
1956 /* Drop publication only if it was created by this tool */
1957 if (dbinfo->made_publication)
1958 {
1959 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1960 &dbinfo->made_publication);
1961 }
1962 else
1963 {
1964 if (dry_run)
1965 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1966 dbinfo->pubname, dbinfo->dbname);
1967 else
1968 pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
1969 dbinfo->pubname, dbinfo->dbname);
1970 }
1971 }
1972}
1973
1974/*
1975 * Create a subscription with some predefined options.
1976 *
1977 * A replication slot was already created in a previous step. Let's use it. It
1978 * is not required to copy data. The subscription will be created but it will
1979 * not be enabled now. That's because the replication progress must be set and
1980 * the replication origin name (one of the function arguments) contains the
1981 * subscription OID in its name. Once the subscription is created,
1982 * set_replication_progress() can obtain the chosen origin name and set up its
1983 * initial location.
1984 */
1985static void
1987{
1989 PGresult *res;
1990 char *pubname_esc;
1991 char *subname_esc;
1992 char *pubconninfo_esc;
1993 char *replslotname_esc;
1994
1995 Assert(conn != NULL);
1996
2001
2002 if (dry_run)
2003 pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
2004 dbinfo->subname, dbinfo->dbname);
2005 else
2006 pg_log_info("creating subscription \"%s\" in database \"%s\"",
2007 dbinfo->subname, dbinfo->dbname);
2008
2010 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2011 "WITH (create_slot = false, enabled = false, "
2012 "slot_name = %s, copy_data = false, two_phase = %s)",
2014 dbinfos.two_phase ? "true" : "false");
2015
2020
2021 pg_log_debug("command is: %s", str->data);
2022
2023 if (!dry_run)
2024 {
2025 res = PQexec(conn, str->data);
2026 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2027 {
2028 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
2029 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2031 }
2032 PQclear(res);
2033 }
2034
2036}
2037
2038/*
2039 * Sets the replication progress to the consistent LSN.
2040 *
2041 * The subscriber caught up to the consistent LSN provided by the last
2042 * replication slot that was created. The goal is to set up the initial
2043 * location for the logical replication that is the exact LSN that the
2044 * subscriber was promoted. Once the subscription is enabled it will start
2045 * streaming from that location onwards. In dry run mode, the subscription OID
2046 * and LSN are set to invalid values for printing purposes.
2047 */
2048static void
2049set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
2050{
2052 PGresult *res;
2053 Oid suboid;
2054 char *subname;
2055 char *dbname;
2056 char *originname;
2057 char *lsnstr;
2058
2059 Assert(conn != NULL);
2060
2061 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2062 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2063
2065 "SELECT s.oid FROM pg_catalog.pg_subscription s "
2066 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2067 "WHERE s.subname = %s AND d.datname = %s",
2068 subname, dbname);
2069
2070 res = PQexec(conn, str->data);
2071 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2072 {
2073 pg_log_error("could not obtain subscription OID: %s",
2076 }
2077
2078 if (PQntuples(res) != 1 && !dry_run)
2079 {
2080 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
2081 PQntuples(res), 1);
2083 }
2084
2085 if (dry_run)
2086 {
2089 }
2090 else
2091 {
2092 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2093 lsnstr = psprintf("%s", lsn);
2094 }
2095
2096 PQclear(res);
2097
2098 /*
2099 * The origin name is defined as pg_%u. %u is the subscription OID. See
2100 * ApplyWorkerMain().
2101 */
2102 originname = psprintf("pg_%u", suboid);
2103
2104 if (dry_run)
2105 pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2106 originname, lsnstr, dbinfo->dbname);
2107 else
2108 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2109 originname, lsnstr, dbinfo->dbname);
2110
2113 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2115
2116 pg_log_debug("command is: %s", str->data);
2117
2118 if (!dry_run)
2119 {
2120 res = PQexec(conn, str->data);
2121 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2122 {
2123 pg_log_error("could not set replication progress for subscription \"%s\": %s",
2124 dbinfo->subname, PQresultErrorMessage(res));
2126 }
2127 PQclear(res);
2128 }
2129
2133 pg_free(lsnstr);
2135}
2136
2137/*
2138 * Enables the subscription.
2139 *
2140 * The subscription was created in a previous step but it was disabled. After
2141 * adjusting the initial logical replication location, enable the subscription.
2142 */
2143static void
2145{
2147 PGresult *res;
2148 char *subname;
2149
2150 Assert(conn != NULL);
2151
2152 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2153
2154 if (dry_run)
2155 pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2156 dbinfo->subname, dbinfo->dbname);
2157 else
2158 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2159 dbinfo->subname, dbinfo->dbname);
2160
2161 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2162
2163 pg_log_debug("command is: %s", str->data);
2164
2165 if (!dry_run)
2166 {
2167 res = PQexec(conn, str->data);
2168 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2169 {
2170 pg_log_error("could not enable subscription \"%s\": %s",
2171 dbinfo->subname, PQresultErrorMessage(res));
2173 }
2174
2175 PQclear(res);
2176 }
2177
2180}
2181
2182/*
2183 * Fetch a list of all connectable non-template databases from the source server
2184 * and form a list such that they appear as if the user has specified multiple
2185 * --database options, one for each source database.
2186 */
2187static void
2189 bool dbnamespecified)
2190{
2191 PGconn *conn;
2192 PGresult *res;
2193
2194 /* If a database name was specified, just connect to it. */
2195 if (dbnamespecified)
2197 else
2198 {
2199 /* Otherwise, try postgres first and then template1. */
2200 char *conninfo;
2201
2202 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2203 conn = connect_database(conninfo, false);
2204 pg_free(conninfo);
2205 if (!conn)
2206 {
2207 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2208 conn = connect_database(conninfo, true);
2209 pg_free(conninfo);
2210 }
2211 }
2212
2213 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2214 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2215 {
2216 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2217 PQclear(res);
2219 }
2220
2221 for (int i = 0; i < PQntuples(res); i++)
2222 {
2223 const char *dbname = PQgetvalue(res, i, 0);
2224
2226
2227 /* Increment num_dbs to reflect multiple --database options */
2228 num_dbs++;
2229 }
2230
2231 PQclear(res);
2232 disconnect_database(conn, false);
2233}
2234
2235int
2236main(int argc, char **argv)
2237{
2238 static struct option long_options[] =
2239 {
2240 {"all", no_argument, NULL, 'a'},
2241 {"database", required_argument, NULL, 'd'},
2242 {"pgdata", required_argument, NULL, 'D'},
2243 {"logdir", required_argument, NULL, 'l'},
2244 {"dry-run", no_argument, NULL, 'n'},
2245 {"subscriber-port", required_argument, NULL, 'p'},
2246 {"publisher-server", required_argument, NULL, 'P'},
2247 {"socketdir", required_argument, NULL, 's'},
2248 {"recovery-timeout", required_argument, NULL, 't'},
2249 {"enable-two-phase", no_argument, NULL, 'T'},
2250 {"subscriber-username", required_argument, NULL, 'U'},
2251 {"verbose", no_argument, NULL, 'v'},
2252 {"version", no_argument, NULL, 'V'},
2253 {"help", no_argument, NULL, '?'},
2254 {"config-file", required_argument, NULL, 1},
2255 {"publication", required_argument, NULL, 2},
2256 {"replication-slot", required_argument, NULL, 3},
2257 {"subscription", required_argument, NULL, 4},
2258 {"clean", required_argument, NULL, 5},
2259 {NULL, 0, NULL, 0}
2260 };
2261
2262 struct CreateSubscriberOptions opt = {0};
2263
2264 int c;
2265 int option_index;
2266
2267 char *pub_base_conninfo;
2268 char *sub_base_conninfo;
2269 char *dbname_conninfo = NULL;
2270
2273 struct stat statbuf;
2274
2275 char *consistent_lsn;
2276
2277 char pidfile[MAXPGPATH];
2278
2279 pg_logging_init(argv[0]);
2281 progname = get_progname(argv[0]);
2282 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2283
2284 if (argc > 1)
2285 {
2286 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2287 {
2288 usage();
2289 exit(0);
2290 }
2291 else if (strcmp(argv[1], "-V") == 0
2292 || strcmp(argv[1], "--version") == 0)
2293 {
2294 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2295 exit(0);
2296 }
2297 }
2298
2299 /* Default settings */
2301 opt.config_file = NULL;
2302 opt.log_dir = NULL;
2303 opt.pub_conninfo_str = NULL;
2304 opt.socket_dir = NULL;
2306 opt.sub_username = NULL;
2307 opt.two_phase = false;
2309 {
2310 0
2311 };
2312 opt.recovery_timeout = 0;
2313 opt.all_dbs = false;
2314
2315 /*
2316 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2317 * it either.
2318 */
2319#ifndef WIN32
2320 if (geteuid() == 0)
2321 {
2322 pg_log_error("cannot be executed by \"root\"");
2323 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2324 progname);
2325 exit(1);
2326 }
2327#endif
2328
2330
2331 while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
2332 long_options, &option_index)) != -1)
2333 {
2334 switch (c)
2335 {
2336 case 'a':
2337 opt.all_dbs = true;
2338 break;
2339 case 'd':
2341 {
2343 num_dbs++;
2344 }
2345 else
2346 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2347 break;
2348 case 'D':
2351 break;
2352 case 'l':
2353 opt.log_dir = pg_strdup(optarg);
2355 break;
2356 case 'n':
2357 dry_run = true;
2358 break;
2359 case 'p':
2360 opt.sub_port = pg_strdup(optarg);
2361 break;
2362 case 'P':
2364 break;
2365 case 's':
2368 break;
2369 case 't':
2371 break;
2372 case 'T':
2373 opt.two_phase = true;
2374 break;
2375 case 'U':
2377 break;
2378 case 'v':
2380 break;
2381 case 1:
2383 break;
2384 case 2:
2386 {
2388 num_pubs++;
2389 }
2390 else
2391 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2392 break;
2393 case 3:
2395 {
2397 num_replslots++;
2398 }
2399 else
2400 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2401 break;
2402 case 4:
2404 {
2406 num_subs++;
2407 }
2408 else
2409 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2410 break;
2411 case 5:
2414 else
2415 pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2416 break;
2417 default:
2418 /* getopt_long already emitted a complaint */
2419 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2420 exit(1);
2421 }
2422 }
2423
2424 /* Validate that --all is not used with incompatible options */
2425 if (opt.all_dbs)
2426 {
2427 char *bad_switch = NULL;
2428
2429 if (num_dbs > 0)
2430 bad_switch = "--database";
2431 else if (num_pubs > 0)
2432 bad_switch = "--publication";
2433 else if (num_replslots > 0)
2434 bad_switch = "--replication-slot";
2435 else if (num_subs > 0)
2436 bad_switch = "--subscription";
2437
2438 if (bad_switch)
2439 {
2440 pg_log_error("options %s and %s cannot be used together",
2441 bad_switch, "-a/--all");
2442 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2443 exit(1);
2444 }
2445 }
2446
2447 /* Any non-option arguments? */
2448 if (optind < argc)
2449 {
2450 pg_log_error("too many command-line arguments (first is \"%s\")",
2451 argv[optind]);
2452 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2453 exit(1);
2454 }
2455
2456 /* Required arguments */
2457 if (subscriber_dir == NULL)
2458 {
2459 pg_log_error("no subscriber data directory specified");
2460 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2461 exit(1);
2462 }
2463
2464 /* If socket directory is not provided, use the current directory */
2465 if (opt.socket_dir == NULL)
2466 {
2467 char cwd[MAXPGPATH];
2468
2469 if (!getcwd(cwd, MAXPGPATH))
2470 pg_fatal("could not determine current directory");
2471 opt.socket_dir = pg_strdup(cwd);
2473 }
2474
2475 /*
2476 * Parse connection string. Build a base connection string that might be
2477 * reused by multiple databases.
2478 */
2479 if (opt.pub_conninfo_str == NULL)
2480 {
2481 /*
2482 * TODO use primary_conninfo (if available) from subscriber and
2483 * extract publisher connection string. Assume that there are
2484 * identical entries for physical and logical replication. If there is
2485 * not, we would fail anyway.
2486 */
2487 pg_log_error("no publisher connection string specified");
2488 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2489 exit(1);
2490 }
2491
2492 if (opt.log_dir != NULL)
2493 {
2494 char *internal_log_file;
2496
2498
2499 /*
2500 * Set mask based on PGDATA permissions, needed for the creation of
2501 * the output directories with correct permissions, similar with
2502 * pg_ctl and pg_upgrade.
2503 *
2504 * Don't error here if the data directory cannot be stat'd. Upcoming
2505 * checks for the data directory would raise the fatal error later.
2506 */
2509
2512
2515 pg_fatal("could not open log file \"%s\": %m", internal_log_file);
2516
2518
2520 }
2521
2522 if (dry_run)
2523 pg_log_info("Executing in dry-run mode.\n"
2524 "The target directory will not be modified.");
2525
2526 pg_log_info("validating publisher connection string");
2529 if (pub_base_conninfo == NULL)
2530 exit(1);
2531
2532 pg_log_info("validating subscriber connection string");
2534
2535 /*
2536 * Fetch all databases from the source (publisher) and treat them as if
2537 * the user specified has multiple --database options, one for each source
2538 * database.
2539 */
2540 if (opt.all_dbs)
2541 {
2543
2545 }
2546
2547 if (opt.database_names.head == NULL)
2548 {
2549 pg_log_info("no database was specified");
2550
2551 /*
2552 * Try to obtain the dbname from the publisher conninfo. If dbname
2553 * parameter is not available, error out.
2554 */
2555 if (dbname_conninfo)
2556 {
2558 num_dbs++;
2559
2560 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2562 }
2563 else
2564 {
2565 pg_log_error("no database name specified");
2566 pg_log_error_hint("Try \"%s --help\" for more information.",
2567 progname);
2568 exit(1);
2569 }
2570 }
2571
2572 /* Number of object names must match number of databases */
2573 if (num_pubs > 0 && num_pubs != num_dbs)
2574 {
2575 pg_log_error("wrong number of publication names specified");
2576 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2577 num_pubs, num_dbs);
2578 exit(1);
2579 }
2580 if (num_subs > 0 && num_subs != num_dbs)
2581 {
2582 pg_log_error("wrong number of subscription names specified");
2583 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2584 num_subs, num_dbs);
2585 exit(1);
2586 }
2587 if (num_replslots > 0 && num_replslots != num_dbs)
2588 {
2589 pg_log_error("wrong number of replication slot names specified");
2590 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2592 exit(1);
2593 }
2594
2595 /* Verify the object types specified for removal from the subscriber */
2596 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2597 {
2598 if (pg_strcasecmp(cell->val, "publications") == 0)
2600 else
2601 {
2602 pg_log_error("invalid object type \"%s\" specified for %s",
2603 cell->val, "--clean");
2604 pg_log_error_hint("The valid value is: \"%s\"", "publications");
2605 exit(1);
2606 }
2607 }
2608
2609 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2610 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2611 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2612
2613 /* Rudimentary check for a data directory */
2615
2617
2618 /*
2619 * Store database information for publisher and subscriber. It should be
2620 * called before atexit() because its return is used in the
2621 * cleanup_objects_atexit().
2622 */
2624
2625 /* Register a function to clean up objects in case of failure */
2627
2628 /*
2629 * Check if the subscriber data directory has the same system identifier
2630 * than the publisher data directory.
2631 */
2634 if (pub_sysid != sub_sysid)
2635 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2636
2637 /* Subscriber PID file */
2638 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2639
2640 /*
2641 * The standby server must not be running. If the server is started under
2642 * service manager and pg_createsubscriber stops it, the service manager
2643 * might react to this action and start the server again. Therefore,
2644 * refuse to proceed if the server is running to avoid possible failures.
2645 */
2646 if (stat(pidfile, &statbuf) == 0)
2647 {
2648 pg_log_error("standby server is running");
2649 pg_log_error_hint("Stop the standby server and try again.");
2650 exit(1);
2651 }
2652
2653 /*
2654 * Start a short-lived standby server with temporary parameters (provided
2655 * by command-line options). The goal is to avoid connections during the
2656 * transformation steps.
2657 */
2658 pg_log_info("starting the standby server with command-line options");
2659 start_standby_server(&opt, true, false);
2660
2661 /* Check if the standby server is ready for logical replication */
2663
2664 /* Check if the primary server is ready for logical replication */
2666
2667 /*
2668 * Stop the target server. The recovery process requires that the server
2669 * reaches a consistent state before targeting the recovery stop point.
2670 * Make sure a consistent state is reached (stop the target server
2671 * guarantees it) *before* creating the replication slots in
2672 * setup_publisher().
2673 */
2674 pg_log_info("stopping the subscriber");
2676
2677 /* Create the required objects for each database on publisher */
2679
2680 /* Write the required recovery parameters */
2682
2683 /*
2684 * Start subscriber so the recovery parameters will take effect. Wait
2685 * until accepting connections. We don't want to start logical replication
2686 * during setup.
2687 */
2688 pg_log_info("starting the subscriber");
2689 start_standby_server(&opt, true, true);
2690
2691 /* Waiting the subscriber to be promoted */
2693
2694 /*
2695 * Create the subscription for each database on subscriber. It does not
2696 * enable it immediately because it needs to adjust the replication start
2697 * point to the LSN reported by setup_publisher(). It also cleans up
2698 * publications created by this tool and replication to the standby.
2699 */
2701
2702 /* Remove primary_slot_name if it exists on primary */
2704
2705 /* Remove failover replication slots if they exist on subscriber */
2707
2708 /* Stop the subscriber */
2709 pg_log_info("stopping the subscriber");
2711
2712 /* Change system identifier from subscriber */
2714
2715 success = true;
2716
2717 pg_log_info("Done!");
2718
2719 return 0;
2720}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define Assert(condition)
Definition c.h:943
#define PG_TEXTDOMAIN(domain)
Definition c.h:1303
uint64_t uint64
Definition c.h:625
uint32_t uint32
Definition c.h:624
int find_my_exec(const char *argv0, char *retpath)
Definition exec.c:161
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition exec.c:430
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition exec.c:311
int main(void)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition connect.h:25
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
#define USECS_PER_SEC
Definition timestamp.h:134
#define _(x)
Definition elog.c:95
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
PGconn * PQconnectdb(const char *conninfo)
Definition fe-connect.c:830
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
void PQfreemem(void *ptr)
Definition fe-exec.c:4049
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4399
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4405
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
char * pg_strdup(const char *in)
Definition fe_memutils.c:85
void pg_free(void *ptr)
#define pg_malloc_array(type, count)
Definition fe_memutils.h:56
uint32 get_pg_version(const char *datadir, char **version_str)
Definition version.c:44
int pg_mode_mask
Definition file_perm.c:25
int pg_dir_create_mode
Definition file_perm.c:18
bool GetDataDirectoryCreatePerm(const char *dataDir)
#define PG_MODE_MASK_OWNER
Definition file_perm.h:24
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition getopt_long.c:60
#define no_argument
Definition getopt_long.h:25
#define required_argument
Definition getopt_long.h:26
const char * str
long val
Definition informix.c:689
int i
Definition isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
#define PQclear
#define PQresultStatus
#define PQntuples
@ CONNECTION_OK
Definition libpq-fe.h:90
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
@ PGRES_TUPLES_OK
Definition libpq-fe.h:134
static struct pg_tm tm
Definition localtime.c:104
void pg_logging_increase_verbosity(void)
Definition logging.c:187
void pg_logging_set_logfile(FILE *logfile)
Definition logging.c:210
void pg_logging_init(const char *argv0)
Definition logging.c:85
void pg_logging_set_level(enum pg_log_level new_level)
Definition logging.c:178
#define pg_log_error(...)
Definition logging.h:108
#define pg_log_error_hint(...)
Definition logging.h:114
#define pg_log_info(...)
Definition logging.h:126
#define pg_log_warning_hint(...)
Definition logging.h:123
#define pg_log_info_hint(...)
Definition logging.h:132
@ PG_LOG_WARNING
Definition logging.h:38
#define pg_log_warning_detail(...)
Definition logging.h:120
#define pg_log_error_detail(...)
Definition logging.h:111
#define pg_log_debug(...)
Definition logging.h:135
static char * errmsg
#define pg_fatal(...)
static PQExpBuffer recoveryconfcontents
#define MAXPGPATH
#define SERVER_LOG_FILE_NAME
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
#define WAIT_INTERVAL
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
#define INTERNAL_LOG_FILE_NAME
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * logdir
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
#define INCLUDED_CONF_FILE
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static struct LogicalRepInfos dbinfos
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool success
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static bool dry_run
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
#define OBJECTTYPE_PUBLICATIONS
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void make_output_dirs(const char *log_basedir)
static void usage(void)
#define INCLUDED_CONF_FILE_DISABLED
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static bool recovery_params_set
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
static char * argv0
Definition pg_ctl.c:94
static char * exec_path
Definition pg_ctl.c:89
PGDLLIMPORT int optind
Definition getopt.c:47
PGDLLIMPORT char * optarg
Definition getopt.c:49
uint32 pg_prng_uint32(pg_prng_state *state)
Definition pg_prng.c:227
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition pg_prng.c:89
char * datadir
NameData subname
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define pg_log_warning(...)
Definition pgfnames.c:24
int64 timestamp
int pg_strcasecmp(const char *s1, const char *s2)
const char * pg_strsignal(int signum)
Definition pgstrsignal.c:39
void canonicalize_path(char *path)
Definition path.c:337
#define snprintf
Definition port.h:260
#define DEVNULL
Definition port.h:161
const char * get_progname(const char *argv0)
Definition path.c:652
#define printf(...)
Definition port.h:266
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45
#define InvalidOid
unsigned int Oid
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
char * c
static int fd(const char *x, int i)
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
void get_restricted_token(void)
void pg_usleep(long microsec)
Definition signal.c:53
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition simple_list.c:63
char * dbname
Definition streamutil.c:49
PGconn * conn
Definition streamutil.c:52
void appendShellString(PQExpBuffer buf, const char *str)
void appendConnStrVal(PQExpBuffer buf, const char *str)
SimpleStringList database_names
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
struct LogicalRepInfo * dbinfo
SimpleStringListCell * head
Definition simple_list.h:42
#define GET_PG_MAJORVERSION_NUM(v)
Definition version.h:19
char * wait_result_to_str(int exitstatus)
Definition wait_error.c:33
#define stat
Definition win32_port.h:74
#define mkdir(a, b)
Definition win32_port.h:80
#define WIFEXITED(w)
Definition win32_port.h:150
#define WIFSIGNALED(w)
Definition win32_port.h:151
#define WTERMSIG(w)
Definition win32_port.h:153
#define WEXITSTATUS(w)
Definition win32_port.h:152
int gettimeofday(struct timeval *tp, void *tzp)
int wal_level
Definition xlog.c:138
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28