PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
pg_createsubscriber.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_createsubscriber.c
4 * Create a new logical replica from a standby server
5 *
6 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_createsubscriber.c
10 *
11 *-------------------------------------------------------------------------
12 */
13
14#include "postgres_fe.h"
15
16#include <sys/stat.h>
17#include <sys/time.h>
18#include <sys/wait.h>
19#include <time.h>
20
21#include "common/connect.h"
23#include "common/logging.h"
24#include "common/pg_prng.h"
29#include "getopt_long.h"
30
31#define DEFAULT_SUB_PORT "50432"
32#define OBJECTTYPE_PUBLICATIONS 0x0001
33
34/* Command-line options */
36{
37 char *config_file; /* configuration file */
38 char *pub_conninfo_str; /* publisher connection string */
39 char *socket_dir; /* directory for Unix-domain socket, if any */
40 char *sub_port; /* subscriber port number */
41 const char *sub_username; /* subscriber username */
42 bool two_phase; /* enable-two-phase option */
43 SimpleStringList database_names; /* list of database names */
44 SimpleStringList pub_names; /* list of publication names */
45 SimpleStringList sub_names; /* list of subscription names */
46 SimpleStringList replslot_names; /* list of replication slot names */
47 int recovery_timeout; /* stop recovery after this time */
48 bool all_dbs; /* all option */
49 SimpleStringList objecttypes_to_remove; /* list of object types to remove */
50};
51
52/* per-database publication/subscription info */
54{
55 char *dbname; /* database name */
56 char *pubconninfo; /* publisher connection string */
57 char *subconninfo; /* subscriber connection string */
58 char *pubname; /* publication name */
59 char *subname; /* subscription name */
60 char *replslotname; /* replication slot name */
61
62 bool made_replslot; /* replication slot was created */
63 bool made_publication; /* publication was created */
64};
65
66/*
67 * Information shared across all the databases (or publications and
68 * subscriptions).
69 */
71{
73 bool two_phase; /* enable-two-phase option */
74 bits32 objecttypes_to_remove; /* flags indicating which object types
75 * to remove on subscriber */
76};
77
78static void cleanup_objects_atexit(void);
79static void usage();
80static char *get_base_conninfo(const char *conninfo, char **dbname);
81static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
82static char *get_exec_path(const char *argv0, const char *progname);
83static void check_data_directory(const char *datadir);
84static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
85static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
86 const char *pub_base_conninfo,
87 const char *sub_base_conninfo);
88static PGconn *connect_database(const char *conninfo, bool exit_on_error);
89static void disconnect_database(PGconn *conn, bool exit_on_error);
90static uint64 get_primary_sysid(const char *conninfo);
91static uint64 get_standby_sysid(const char *datadir);
92static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
94static char *generate_object_name(PGconn *conn);
95static void check_publisher(const struct LogicalRepInfo *dbinfo);
96static char *setup_publisher(struct LogicalRepInfo *dbinfo);
97static void check_subscriber(const struct LogicalRepInfo *dbinfo);
98static void setup_subscriber(struct LogicalRepInfo *dbinfo,
99 const char *consistent_lsn);
100static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
101 const char *lsn);
102static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
103 const char *slotname);
104static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
106 struct LogicalRepInfo *dbinfo);
107static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
108 const char *slot_name);
109static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
110static void start_standby_server(const struct CreateSubscriberOptions *opt,
111 bool restricted_access,
112 bool restrict_logical_worker);
113static void stop_standby_server(const char *datadir);
114static void wait_for_end_recovery(const char *conninfo,
115 const struct CreateSubscriberOptions *opt);
116static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
117static void drop_publication(PGconn *conn, const char *pubname,
118 const char *dbname, bool *made_publication);
119static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
120static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
121static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
122 const char *lsn);
123static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
125 const struct LogicalRepInfo *dbinfo);
126static void drop_existing_subscriptions(PGconn *conn, const char *subname,
127 const char *dbname);
129 bool dbnamespecified);
130
131#define USEC_PER_SEC 1000000
132#define WAIT_INTERVAL 1 /* 1 second */
133
134static const char *progname;
135
136static char *primary_slot_name = NULL;
137static bool dry_run = false;
138
139static bool success = false;
140
142static int num_dbs = 0; /* number of specified databases */
143static int num_pubs = 0; /* number of specified publications */
144static int num_subs = 0; /* number of specified subscriptions */
145static int num_replslots = 0; /* number of specified replication slots */
146
148
149static char *pg_ctl_path = NULL;
150static char *pg_resetwal_path = NULL;
151
152/* standby / subscriber data directory */
153static char *subscriber_dir = NULL;
154
155static bool recovery_ended = false;
156static bool standby_running = false;
157
159{
163
164
165/*
166 * Cleanup objects that were created by pg_createsubscriber if there is an
167 * error.
168 *
169 * Publications and replication slots are created on primary. Depending on the
170 * step it failed, it should remove the already created objects if it is
171 * possible (sometimes it won't work due to a connection issue).
172 * There is no cleanup on the target server. The steps on the target server are
173 * executed *after* promotion, hence, at this point, a failure means recreate
174 * the physical replica and start again.
175 */
176static void
178{
179 if (success)
180 return;
181
182 /*
183 * If the server is promoted, there is no way to use the current setup
184 * again. Warn the user that a new replication setup should be done before
185 * trying again.
186 */
187 if (recovery_ended)
188 {
189 pg_log_warning("failed after the end of recovery");
190 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
191 "You must recreate the physical replica before continuing.");
192 }
193
194 for (int i = 0; i < num_dbs; i++)
195 {
196 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
197
198 if (dbinfo->made_publication || dbinfo->made_replslot)
199 {
200 PGconn *conn;
201
202 conn = connect_database(dbinfo->pubconninfo, false);
203 if (conn != NULL)
204 {
205 if (dbinfo->made_publication)
206 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
207 &dbinfo->made_publication);
208 if (dbinfo->made_replslot)
209 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
211 }
212 else
213 {
214 /*
215 * If a connection could not be established, inform the user
216 * that some objects were left on primary and should be
217 * removed before trying again.
218 */
219 if (dbinfo->made_publication)
220 {
221 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
222 dbinfo->pubname,
223 dbinfo->dbname);
224 pg_log_warning_hint("Drop this publication before trying again.");
225 }
226 if (dbinfo->made_replslot)
227 {
228 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
229 dbinfo->replslotname,
230 dbinfo->dbname);
231 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
232 }
233 }
234 }
235 }
236
237 if (standby_running)
239}
240
241static void
242usage(void)
243{
244 printf(_("%s creates a new logical replica from a standby server.\n\n"),
245 progname);
246 printf(_("Usage:\n"));
247 printf(_(" %s [OPTION]...\n"), progname);
248 printf(_("\nOptions:\n"));
249 printf(_(" -a, --all create subscriptions for all databases except template\n"
250 " databases or databases that don't allow connections\n"));
251 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
252 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
253 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
254 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
255 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
256 printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
257 " databases on the subscriber; accepts: publications\n"));
258 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
259 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
260 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
261 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
262 printf(_(" -v, --verbose output verbose messages\n"));
263 printf(_(" --config-file=FILENAME use specified main server configuration\n"
264 " file when running target cluster\n"));
265 printf(_(" --publication=NAME publication name\n"));
266 printf(_(" --replication-slot=NAME replication slot name\n"));
267 printf(_(" --subscription=NAME subscription name\n"));
268 printf(_(" -V, --version output version information, then exit\n"));
269 printf(_(" -?, --help show this help, then exit\n"));
270 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
271 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
272}
273
274/*
275 * Subroutine to append "keyword=value" to a connection string,
276 * with proper quoting of the value. (We assume keywords don't need that.)
277 */
278static void
279appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
280{
281 if (buf->len > 0)
283 appendPQExpBufferStr(buf, keyword);
286}
287
288/*
289 * Validate a connection string. Returns a base connection string that is a
290 * connection string without a database name.
291 *
292 * Since we might process multiple databases, each database name will be
293 * appended to this base connection string to provide a final connection
294 * string. If the second argument (dbname) is not null, returns dbname if the
295 * provided connection string contains it.
296 *
297 * It is the caller's responsibility to free the returned connection string and
298 * dbname.
299 */
300static char *
301get_base_conninfo(const char *conninfo, char **dbname)
302{
304 PQconninfoOption *conn_opts;
305 PQconninfoOption *conn_opt;
306 char *errmsg = NULL;
307 char *ret;
308
309 conn_opts = PQconninfoParse(conninfo, &errmsg);
310 if (conn_opts == NULL)
311 {
312 pg_log_error("could not parse connection string: %s", errmsg);
314 return NULL;
315 }
316
318 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
319 {
320 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
321 {
322 if (strcmp(conn_opt->keyword, "dbname") == 0)
323 {
324 if (dbname)
325 *dbname = pg_strdup(conn_opt->val);
326 continue;
327 }
328 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
329 }
330 }
331
332 ret = pg_strdup(buf->data);
333
335 PQconninfoFree(conn_opts);
336
337 return ret;
338}
339
340/*
341 * Build a subscriber connection string. Only a few parameters are supported
342 * since it starts a server with restricted access.
343 */
344static char *
346{
348 char *ret;
349
350 appendConnStrItem(buf, "port", opt->sub_port);
351#if !defined(WIN32)
352 appendConnStrItem(buf, "host", opt->socket_dir);
353#endif
354 if (opt->sub_username != NULL)
355 appendConnStrItem(buf, "user", opt->sub_username);
356 appendConnStrItem(buf, "fallback_application_name", progname);
357
358 ret = pg_strdup(buf->data);
359
361
362 return ret;
363}
364
365/*
366 * Verify if a PostgreSQL binary (progname) is available in the same directory as
367 * pg_createsubscriber and it has the same version. It returns the absolute
368 * path of the progname.
369 */
370static char *
371get_exec_path(const char *argv0, const char *progname)
372{
373 char *versionstr;
374 char *exec_path;
375 int ret;
376
377 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
379 ret = find_other_exec(argv0, progname, versionstr, exec_path);
380
381 if (ret < 0)
382 {
383 char full_path[MAXPGPATH];
384
385 if (find_my_exec(argv0, full_path) < 0)
386 strlcpy(full_path, progname, sizeof(full_path));
387
388 if (ret == -1)
389 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
390 progname, "pg_createsubscriber", full_path);
391 else
392 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
393 progname, full_path, "pg_createsubscriber");
394 }
395
396 pg_log_debug("%s path is: %s", progname, exec_path);
397
398 return exec_path;
399}
400
401/*
402 * Is it a cluster directory? These are preliminary checks. It is far from
403 * making an accurate check. If it is not a clone from the publisher, it will
404 * eventually fail in a future step.
405 */
406static void
408{
409 struct stat statbuf;
410 char versionfile[MAXPGPATH];
411
412 pg_log_info("checking if directory \"%s\" is a cluster data directory",
413 datadir);
414
415 if (stat(datadir, &statbuf) != 0)
416 {
417 if (errno == ENOENT)
418 pg_fatal("data directory \"%s\" does not exist", datadir);
419 else
420 pg_fatal("could not access directory \"%s\": %m", datadir);
421 }
422
423 snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
424 if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
425 {
426 pg_fatal("directory \"%s\" is not a database cluster directory",
427 datadir);
428 }
429}
430
431/*
432 * Append database name into a base connection string.
433 *
434 * dbname is the only parameter that changes so it is not included in the base
435 * connection string. This function concatenates dbname to build a "real"
436 * connection string.
437 */
438static char *
439concat_conninfo_dbname(const char *conninfo, const char *dbname)
440{
442 char *ret;
443
444 Assert(conninfo != NULL);
445
446 appendPQExpBufferStr(buf, conninfo);
447 appendConnStrItem(buf, "dbname", dbname);
448
449 ret = pg_strdup(buf->data);
451
452 return ret;
453}
454
455/*
456 * Store publication and subscription information.
457 *
458 * If publication, replication slot and subscription names were specified,
459 * store it here. Otherwise, a generated name will be assigned to the object in
460 * setup_publisher().
461 */
462static struct LogicalRepInfo *
464 const char *pub_base_conninfo,
465 const char *sub_base_conninfo)
466{
467 struct LogicalRepInfo *dbinfo;
468 SimpleStringListCell *pubcell = NULL;
469 SimpleStringListCell *subcell = NULL;
470 SimpleStringListCell *replslotcell = NULL;
471 int i = 0;
472
473 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
474
475 if (num_pubs > 0)
476 pubcell = opt->pub_names.head;
477 if (num_subs > 0)
478 subcell = opt->sub_names.head;
479 if (num_replslots > 0)
480 replslotcell = opt->replslot_names.head;
481
482 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
483 {
484 char *conninfo;
485
486 /* Fill publisher attributes */
487 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
488 dbinfo[i].pubconninfo = conninfo;
489 dbinfo[i].dbname = cell->val;
490 if (num_pubs > 0)
491 dbinfo[i].pubname = pubcell->val;
492 else
493 dbinfo[i].pubname = NULL;
494 if (num_replslots > 0)
495 dbinfo[i].replslotname = replslotcell->val;
496 else
497 dbinfo[i].replslotname = NULL;
498 dbinfo[i].made_replslot = false;
499 dbinfo[i].made_publication = false;
500 /* Fill subscriber attributes */
501 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
502 dbinfo[i].subconninfo = conninfo;
503 if (num_subs > 0)
504 dbinfo[i].subname = subcell->val;
505 else
506 dbinfo[i].subname = NULL;
507 /* Other fields will be filled later */
508
509 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
510 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
511 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
512 dbinfo[i].pubconninfo);
513 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
514 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
515 dbinfo[i].subconninfo,
516 dbinfos.two_phase ? "true" : "false");
517
518 if (num_pubs > 0)
519 pubcell = pubcell->next;
520 if (num_subs > 0)
521 subcell = subcell->next;
522 if (num_replslots > 0)
523 replslotcell = replslotcell->next;
524
525 i++;
526 }
527
528 return dbinfo;
529}
530
531/*
532 * Open a new connection. If exit_on_error is true, it has an undesired
533 * condition and it should exit immediately.
534 */
535static PGconn *
536connect_database(const char *conninfo, bool exit_on_error)
537{
538 PGconn *conn;
539 PGresult *res;
540
541 conn = PQconnectdb(conninfo);
543 {
544 pg_log_error("connection to database failed: %s",
546 PQfinish(conn);
547
548 if (exit_on_error)
549 exit(1);
550 return NULL;
551 }
552
553 /* Secure search_path */
556 {
557 pg_log_error("could not clear \"search_path\": %s",
559 PQclear(res);
560 PQfinish(conn);
561
562 if (exit_on_error)
563 exit(1);
564 return NULL;
565 }
566 PQclear(res);
567
568 return conn;
569}
570
571/*
572 * Close the connection. If exit_on_error is true, it has an undesired
573 * condition and it should exit immediately.
574 */
575static void
576disconnect_database(PGconn *conn, bool exit_on_error)
577{
578 Assert(conn != NULL);
579
580 PQfinish(conn);
581
582 if (exit_on_error)
583 exit(1);
584}
585
586/*
587 * Obtain the system identifier using the provided connection. It will be used
588 * to compare if a data directory is a clone of another one.
589 */
590static uint64
591get_primary_sysid(const char *conninfo)
592{
593 PGconn *conn;
594 PGresult *res;
595 uint64 sysid;
596
597 pg_log_info("getting system identifier from publisher");
598
599 conn = connect_database(conninfo, true);
600
601 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
603 {
604 pg_log_error("could not get system identifier: %s",
607 }
608 if (PQntuples(res) != 1)
609 {
610 pg_log_error("could not get system identifier: got %d rows, expected %d row",
611 PQntuples(res), 1);
613 }
614
615 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
616
617 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
618
619 PQclear(res);
621
622 return sysid;
623}
624
625/*
626 * Obtain the system identifier from control file. It will be used to compare
627 * if a data directory is a clone of another one. This routine is used locally
628 * and avoids a connection.
629 */
630static uint64
632{
633 ControlFileData *cf;
634 bool crc_ok;
635 uint64 sysid;
636
637 pg_log_info("getting system identifier from subscriber");
638
639 cf = get_controlfile(datadir, &crc_ok);
640 if (!crc_ok)
641 pg_fatal("control file appears to be corrupt");
642
643 sysid = cf->system_identifier;
644
645 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
646
647 pg_free(cf);
648
649 return sysid;
650}
651
652/*
653 * Modify the system identifier. Since a standby server preserves the system
654 * identifier, it makes sense to change it to avoid situations in which WAL
655 * files from one of the systems might be used in the other one.
656 */
657static void
659{
660 ControlFileData *cf;
661 bool crc_ok;
662 struct timeval tv;
663
664 char *cmd_str;
665
666 pg_log_info("modifying system identifier of subscriber");
667
668 cf = get_controlfile(subscriber_dir, &crc_ok);
669 if (!crc_ok)
670 pg_fatal("control file appears to be corrupt");
671
672 /*
673 * Select a new system identifier.
674 *
675 * XXX this code was extracted from BootStrapXLOG().
676 */
677 gettimeofday(&tv, NULL);
678 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
679 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
680 cf->system_identifier |= getpid() & 0xFFF;
681
682 if (!dry_run)
684
685 pg_log_info("system identifier is %" PRIu64 " on subscriber",
687
688 pg_log_info("running pg_resetwal on the subscriber");
689
690 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
692
693 pg_log_debug("pg_resetwal command is: %s", cmd_str);
694
695 if (!dry_run)
696 {
697 int rc = system(cmd_str);
698
699 if (rc == 0)
700 pg_log_info("subscriber successfully changed the system identifier");
701 else
702 pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
703 }
704
705 pg_free(cf);
706}
707
708/*
709 * Generate an object name using a prefix, database oid and a random integer.
710 * It is used in case the user does not specify an object name (publication,
711 * subscription, replication slot).
712 */
713static char *
715{
716 PGresult *res;
717 Oid oid;
718 uint32 rand;
719 char *objname;
720
721 res = PQexec(conn,
722 "SELECT oid FROM pg_catalog.pg_database "
723 "WHERE datname = pg_catalog.current_database()");
725 {
726 pg_log_error("could not obtain database OID: %s",
729 }
730
731 if (PQntuples(res) != 1)
732 {
733 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
734 PQntuples(res), 1);
736 }
737
738 /* Database OID */
739 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
740
741 PQclear(res);
742
743 /* Random unsigned integer */
744 rand = pg_prng_uint32(&prng_state);
745
746 /*
747 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
748 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
749 * '\0').
750 */
751 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
752
753 return objname;
754}
755
756/*
757 * Create the publications and replication slots in preparation for logical
758 * replication. Returns the LSN from latest replication slot. It will be the
759 * replication start point that is used to adjust the subscriptions (see
760 * set_replication_progress).
761 */
762static char *
764{
765 char *lsn = NULL;
766
767 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
768
769 for (int i = 0; i < num_dbs; i++)
770 {
771 PGconn *conn;
772 char *genname = NULL;
773
774 conn = connect_database(dbinfo[i].pubconninfo, true);
775
776 /*
777 * If an object name was not specified as command-line options, assign
778 * a generated object name. The replication slot has a different rule.
779 * The subscription name is assigned to the replication slot name if
780 * no replication slot is specified. It follows the same rule as
781 * CREATE SUBSCRIPTION.
782 */
783 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
784 genname = generate_object_name(conn);
785 if (num_pubs == 0)
786 dbinfo[i].pubname = pg_strdup(genname);
787 if (num_subs == 0)
788 dbinfo[i].subname = pg_strdup(genname);
789 if (num_replslots == 0)
790 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
791
792 /*
793 * Create publication on publisher. This step should be executed
794 * *before* promoting the subscriber to avoid any transactions between
795 * consistent LSN and the new publication rows (such transactions
796 * wouldn't see the new publication rows resulting in an error).
797 */
798 create_publication(conn, &dbinfo[i]);
799
800 /* Create replication slot on publisher */
801 if (lsn)
802 pg_free(lsn);
803 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
804 if (lsn != NULL || dry_run)
805 pg_log_info("create replication slot \"%s\" on publisher",
806 dbinfo[i].replslotname);
807 else
808 exit(1);
809
810 /*
811 * Since we are using the LSN returned by the last replication slot as
812 * recovery_target_lsn, this LSN is ahead of the current WAL position
813 * and the recovery waits until the publisher writes a WAL record to
814 * reach the target and ends the recovery. On idle systems, this wait
815 * time is unpredictable and could lead to failure in promoting the
816 * subscriber. To avoid that, insert a harmless WAL record.
817 */
818 if (i == num_dbs - 1 && !dry_run)
819 {
820 PGresult *res;
821
822 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
824 {
825 pg_log_error("could not write an additional WAL record: %s",
828 }
829 PQclear(res);
830 }
831
833 }
834
835 return lsn;
836}
837
838/*
839 * Is recovery still in progress?
840 */
841static bool
843{
844 PGresult *res;
845 int ret;
846
847 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
848
850 {
851 pg_log_error("could not obtain recovery progress: %s",
854 }
855
856
857 ret = strcmp("t", PQgetvalue(res, 0, 0));
858
859 PQclear(res);
860
861 return ret == 0;
862}
863
864/*
865 * Is the primary server ready for logical replication?
866 *
867 * XXX Does it not allow a synchronous replica?
868 */
869static void
870check_publisher(const struct LogicalRepInfo *dbinfo)
871{
872 PGconn *conn;
873 PGresult *res;
874 bool failed = false;
875
876 char *wal_level;
877 int max_repslots;
878 int cur_repslots;
879 int max_walsenders;
880 int cur_walsenders;
881 int max_prepared_transactions;
882 char *max_slot_wal_keep_size;
883
884 pg_log_info("checking settings on publisher");
885
886 conn = connect_database(dbinfo[0].pubconninfo, true);
887
888 /*
889 * If the primary server is in recovery (i.e. cascading replication),
890 * objects (publication) cannot be created because it is read only.
891 */
893 {
894 pg_log_error("primary server cannot be in recovery");
896 }
897
898 /*------------------------------------------------------------------------
899 * Logical replication requires a few parameters to be set on publisher.
900 * Since these parameters are not a requirement for physical replication,
901 * we should check it to make sure it won't fail.
902 *
903 * - wal_level = logical
904 * - max_replication_slots >= current + number of dbs to be converted
905 * - max_wal_senders >= current + number of dbs to be converted
906 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
907 * -----------------------------------------------------------------------
908 */
909 res = PQexec(conn,
910 "SELECT pg_catalog.current_setting('wal_level'),"
911 " pg_catalog.current_setting('max_replication_slots'),"
912 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
913 " pg_catalog.current_setting('max_wal_senders'),"
914 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
915 " pg_catalog.current_setting('max_prepared_transactions'),"
916 " pg_catalog.current_setting('max_slot_wal_keep_size')");
917
919 {
920 pg_log_error("could not obtain publisher settings: %s",
923 }
924
925 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
926 max_repslots = atoi(PQgetvalue(res, 0, 1));
927 cur_repslots = atoi(PQgetvalue(res, 0, 2));
928 max_walsenders = atoi(PQgetvalue(res, 0, 3));
929 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
930 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
931 max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
932
933 PQclear(res);
934
935 pg_log_debug("publisher: wal_level: %s", wal_level);
936 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
937 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
938 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
939 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
940 pg_log_debug("publisher: max_prepared_transactions: %d",
941 max_prepared_transactions);
942 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
943 max_slot_wal_keep_size);
944
946
947 if (strcmp(wal_level, "logical") != 0)
948 {
949 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
950 failed = true;
951 }
952
953 if (max_repslots - cur_repslots < num_dbs)
954 {
955 pg_log_error("publisher requires %d replication slots, but only %d remain",
956 num_dbs, max_repslots - cur_repslots);
957 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
958 "max_replication_slots", cur_repslots + num_dbs);
959 failed = true;
960 }
961
962 if (max_walsenders - cur_walsenders < num_dbs)
963 {
964 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
965 num_dbs, max_walsenders - cur_walsenders);
966 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
967 "max_wal_senders", cur_walsenders + num_dbs);
968 failed = true;
969 }
970
971 if (max_prepared_transactions != 0 && !dbinfos.two_phase)
972 {
973 pg_log_warning("two_phase option will not be enabled for replication slots");
974 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
975 "Prepared transactions will be replicated at COMMIT PREPARED.");
976 pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
977 }
978
979 /*
980 * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
981 * non-default value, it may cause replication failures due to required
982 * WAL files being prematurely removed.
983 */
984 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
985 {
986 pg_log_warning("required WAL could be removed from the publisher");
987 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
988 "max_slot_wal_keep_size");
989 }
990
992
993 if (failed)
994 exit(1);
995}
996
997/*
998 * Is the standby server ready for logical replication?
999 *
1000 * XXX Does it not allow a time-delayed replica?
1001 *
1002 * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1003 * is S, it cannot detect there is a replica (server C) because server S starts
1004 * accepting only local connections and server C cannot connect to it. Hence,
1005 * there is not a reliable way to provide a suitable error saying the server C
1006 * will be broken at the end of this process (due to pg_resetwal).
1007 */
1008static void
1010{
1011 PGconn *conn;
1012 PGresult *res;
1013 bool failed = false;
1014
1015 int max_lrworkers;
1016 int max_reporigins;
1017 int max_wprocs;
1018
1019 pg_log_info("checking settings on subscriber");
1020
1021 conn = connect_database(dbinfo[0].subconninfo, true);
1022
1023 /* The target server must be a standby */
1025 {
1026 pg_log_error("target server must be a standby");
1028 }
1029
1030 /*------------------------------------------------------------------------
1031 * Logical replication requires a few parameters to be set on subscriber.
1032 * Since these parameters are not a requirement for physical replication,
1033 * we should check it to make sure it won't fail.
1034 *
1035 * - max_active_replication_origins >= number of dbs to be converted
1036 * - max_logical_replication_workers >= number of dbs to be converted
1037 * - max_worker_processes >= 1 + number of dbs to be converted
1038 *------------------------------------------------------------------------
1039 */
1040 res = PQexec(conn,
1041 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1042 "'max_logical_replication_workers', "
1043 "'max_active_replication_origins', "
1044 "'max_worker_processes', "
1045 "'primary_slot_name') "
1046 "ORDER BY name");
1047
1048 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1049 {
1050 pg_log_error("could not obtain subscriber settings: %s",
1053 }
1054
1055 max_reporigins = atoi(PQgetvalue(res, 0, 0));
1056 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1057 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1058 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1060
1061 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1062 max_lrworkers);
1063 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1064 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1066 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1067
1068 PQclear(res);
1069
1070 disconnect_database(conn, false);
1071
1072 if (max_reporigins < num_dbs)
1073 {
1074 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1075 num_dbs, max_reporigins);
1076 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1077 "max_active_replication_origins", num_dbs);
1078 failed = true;
1079 }
1080
1081 if (max_lrworkers < num_dbs)
1082 {
1083 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1084 num_dbs, max_lrworkers);
1085 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1086 "max_logical_replication_workers", num_dbs);
1087 failed = true;
1088 }
1089
1090 if (max_wprocs < num_dbs + 1)
1091 {
1092 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1093 num_dbs + 1, max_wprocs);
1094 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1095 "max_worker_processes", num_dbs + 1);
1096 failed = true;
1097 }
1098
1099 if (failed)
1100 exit(1);
1101}
1102
1103/*
1104 * Drop a specified subscription. This is to avoid duplicate subscriptions on
1105 * the primary (publisher node) and the newly created subscriber. We
1106 * shouldn't drop the associated slot as that would be used by the publisher
1107 * node.
1108 */
1109static void
1111{
1113 PGresult *res;
1114
1115 Assert(conn != NULL);
1116
1117 /*
1118 * Construct a query string. These commands are allowed to be executed
1119 * within a transaction.
1120 */
1121 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1122 subname);
1123 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1124 subname);
1125 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1126
1127 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1128 subname, dbname);
1129
1130 if (!dry_run)
1131 {
1132 res = PQexec(conn, query->data);
1133
1134 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1135 {
1136 pg_log_error("could not drop subscription \"%s\": %s",
1139 }
1140
1141 PQclear(res);
1142 }
1143
1144 destroyPQExpBuffer(query);
1145}
1146
1147/*
1148 * Retrieve and drop the pre-existing subscriptions.
1149 */
1150static void
1152 const struct LogicalRepInfo *dbinfo)
1153{
1155 char *dbname;
1156 PGresult *res;
1157
1158 Assert(conn != NULL);
1159
1160 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1161
1162 appendPQExpBuffer(query,
1163 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1164 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1165 "WHERE d.datname = %s",
1166 dbname);
1167 res = PQexec(conn, query->data);
1168
1169 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1170 {
1171 pg_log_error("could not obtain pre-existing subscriptions: %s",
1174 }
1175
1176 for (int i = 0; i < PQntuples(res); i++)
1178 dbinfo->dbname);
1179
1180 PQclear(res);
1181 destroyPQExpBuffer(query);
1183}
1184
1185/*
1186 * Create the subscriptions, adjust the initial location for logical
1187 * replication and enable the subscriptions. That's the last step for logical
1188 * replication setup.
1189 */
1190static void
1191setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1192{
1193 for (int i = 0; i < num_dbs; i++)
1194 {
1195 PGconn *conn;
1196
1197 /* Connect to subscriber. */
1198 conn = connect_database(dbinfo[i].subconninfo, true);
1199
1200 /*
1201 * We don't need the pre-existing subscriptions on the newly formed
1202 * subscriber. They can connect to other publisher nodes and either
1203 * get some unwarranted data or can lead to ERRORs in connecting to
1204 * such nodes.
1205 */
1207
1208 /* Check and drop the required publications in the given database. */
1210
1211 create_subscription(conn, &dbinfo[i]);
1212
1213 /* Set the replication progress to the correct LSN */
1214 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1215
1216 /* Enable subscription */
1217 enable_subscription(conn, &dbinfo[i]);
1218
1219 disconnect_database(conn, false);
1220 }
1221}
1222
1223/*
1224 * Write the required recovery parameters.
1225 */
1226static void
1227setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1228{
1229 PGconn *conn;
1231
1232 /*
1233 * Despite of the recovery parameters will be written to the subscriber,
1234 * use a publisher connection. The primary_conninfo is generated using the
1235 * connection settings.
1236 */
1237 conn = connect_database(dbinfo[0].pubconninfo, true);
1238
1239 /*
1240 * Write recovery parameters.
1241 *
1242 * The subscriber is not running yet. In dry run mode, the recovery
1243 * parameters *won't* be written. An invalid LSN is used for printing
1244 * purposes. Additional recovery parameters are added here. It avoids
1245 * unexpected behavior such as end of recovery as soon as a consistent
1246 * state is reached (recovery_target) and failure due to multiple recovery
1247 * targets (name, time, xid, LSN).
1248 */
1250 appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1252 "recovery_target_timeline = 'latest'\n");
1254 "recovery_target_inclusive = true\n");
1256 "recovery_target_action = promote\n");
1257 appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1258 appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1259 appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1260
1261 if (dry_run)
1262 {
1263 appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1265 "recovery_target_lsn = '%X/%X'\n",
1267 }
1268 else
1269 {
1270 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1271 lsn);
1273 }
1274 disconnect_database(conn, false);
1275
1276 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1277}
1278
1279/*
1280 * Drop physical replication slot on primary if the standby was using it. After
1281 * the transformation, it has no use.
1282 *
1283 * XXX we might not fail here. Instead, we provide a warning so the user
1284 * eventually drops this replication slot later.
1285 */
1286static void
1287drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1288{
1289 PGconn *conn;
1290
1291 /* Replication slot does not exist, do nothing */
1292 if (!primary_slot_name)
1293 return;
1294
1295 conn = connect_database(dbinfo[0].pubconninfo, false);
1296 if (conn != NULL)
1297 {
1298 drop_replication_slot(conn, &dbinfo[0], slotname);
1299 disconnect_database(conn, false);
1300 }
1301 else
1302 {
1303 pg_log_warning("could not drop replication slot \"%s\" on primary",
1304 slotname);
1305 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1306 }
1307}
1308
1309/*
1310 * Drop failover replication slots on subscriber. After the transformation,
1311 * they have no use.
1312 *
1313 * XXX We do not fail here. Instead, we provide a warning so the user can drop
1314 * them later.
1315 */
1316static void
1318{
1319 PGconn *conn;
1320 PGresult *res;
1321
1322 conn = connect_database(dbinfo[0].subconninfo, false);
1323 if (conn != NULL)
1324 {
1325 /* Get failover replication slot names */
1326 res = PQexec(conn,
1327 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1328
1329 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1330 {
1331 /* Remove failover replication slots from subscriber */
1332 for (int i = 0; i < PQntuples(res); i++)
1333 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1334 }
1335 else
1336 {
1337 pg_log_warning("could not obtain failover replication slot information: %s",
1339 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1340 }
1341
1342 PQclear(res);
1343 disconnect_database(conn, false);
1344 }
1345 else
1346 {
1347 pg_log_warning("could not drop failover replication slot");
1348 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1349 }
1350}
1351
1352/*
1353 * Create a logical replication slot and returns a LSN.
1354 *
1355 * CreateReplicationSlot() is not used because it does not provide the one-row
1356 * result set that contains the LSN.
1357 */
1358static char *
1360{
1362 PGresult *res = NULL;
1363 const char *slot_name = dbinfo->replslotname;
1364 char *slot_name_esc;
1365 char *lsn = NULL;
1366
1367 Assert(conn != NULL);
1368
1369 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1370 slot_name, dbinfo->dbname);
1371
1372 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1373
1375 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1376 slot_name_esc,
1377 dbinfos.two_phase ? "true" : "false");
1378
1379 PQfreemem(slot_name_esc);
1380
1381 pg_log_debug("command is: %s", str->data);
1382
1383 if (!dry_run)
1384 {
1385 res = PQexec(conn, str->data);
1386 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1387 {
1388 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1389 slot_name, dbinfo->dbname,
1391 PQclear(res);
1393 return NULL;
1394 }
1395
1396 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1397 PQclear(res);
1398 }
1399
1400 /* For cleanup purposes */
1401 dbinfo->made_replslot = true;
1402
1404
1405 return lsn;
1406}
1407
1408static void
1410 const char *slot_name)
1411{
1413 char *slot_name_esc;
1414 PGresult *res;
1415
1416 Assert(conn != NULL);
1417
1418 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1419 slot_name, dbinfo->dbname);
1420
1421 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1422
1423 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1424
1425 PQfreemem(slot_name_esc);
1426
1427 pg_log_debug("command is: %s", str->data);
1428
1429 if (!dry_run)
1430 {
1431 res = PQexec(conn, str->data);
1432 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1433 {
1434 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1435 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1436 dbinfo->made_replslot = false; /* don't try again. */
1437 }
1438
1439 PQclear(res);
1440 }
1441
1443}
1444
1445/*
1446 * Reports a suitable message if pg_ctl fails.
1447 */
1448static void
1449pg_ctl_status(const char *pg_ctl_cmd, int rc)
1450{
1451 if (rc != 0)
1452 {
1453 if (WIFEXITED(rc))
1454 {
1455 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1456 }
1457 else if (WIFSIGNALED(rc))
1458 {
1459#if defined(WIN32)
1460 pg_log_error("pg_ctl was terminated by exception 0x%X",
1461 WTERMSIG(rc));
1462 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1463#else
1464 pg_log_error("pg_ctl was terminated by signal %d: %s",
1465 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1466#endif
1467 }
1468 else
1469 {
1470 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1471 }
1472
1473 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1474 exit(1);
1475 }
1476}
1477
1478static void
1479start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1480 bool restrict_logical_worker)
1481{
1482 PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1483 int rc;
1484
1485 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1486 appendShellString(pg_ctl_cmd, subscriber_dir);
1487 appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1488
1489 /* Prevent unintended slot invalidation */
1490 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1491
1492 if (restricted_access)
1493 {
1494 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1495#if !defined(WIN32)
1496
1497 /*
1498 * An empty listen_addresses list means the server does not listen on
1499 * any IP interfaces; only Unix-domain sockets can be used to connect
1500 * to the server. Prevent external connections to minimize the chance
1501 * of failure.
1502 */
1503 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1504 if (opt->socket_dir)
1505 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1506 opt->socket_dir);
1507 appendPQExpBufferChar(pg_ctl_cmd, '"');
1508#endif
1509 }
1510 if (opt->config_file != NULL)
1511 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1512 opt->config_file);
1513
1514 /* Suppress to start logical replication if requested */
1515 if (restrict_logical_worker)
1516 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1517
1518 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1519 rc = system(pg_ctl_cmd->data);
1520 pg_ctl_status(pg_ctl_cmd->data, rc);
1521 standby_running = true;
1522 destroyPQExpBuffer(pg_ctl_cmd);
1523 pg_log_info("server was started");
1524}
1525
1526static void
1528{
1529 char *pg_ctl_cmd;
1530 int rc;
1531
1532 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1533 datadir);
1534 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1535 rc = system(pg_ctl_cmd);
1536 pg_ctl_status(pg_ctl_cmd, rc);
1537 standby_running = false;
1538 pg_log_info("server was stopped");
1539}
1540
1541/*
1542 * Returns after the server finishes the recovery process.
1543 *
1544 * If recovery_timeout option is set, terminate abnormally without finishing
1545 * the recovery process. By default, it waits forever.
1546 *
1547 * XXX Is the recovery process still in progress? When recovery process has a
1548 * better progress reporting mechanism, it should be added here.
1549 */
1550static void
1551wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1552{
1553 PGconn *conn;
1554 int status = POSTMASTER_STILL_STARTING;
1555 int timer = 0;
1556
1557 pg_log_info("waiting for the target server to reach the consistent state");
1558
1559 conn = connect_database(conninfo, true);
1560
1561 for (;;)
1562 {
1563 bool in_recovery = server_is_in_recovery(conn);
1564
1565 /*
1566 * Does the recovery process finish? In dry run mode, there is no
1567 * recovery mode. Bail out as the recovery process has ended.
1568 */
1569 if (!in_recovery || dry_run)
1570 {
1571 status = POSTMASTER_READY;
1572 recovery_ended = true;
1573 break;
1574 }
1575
1576 /* Bail out after recovery_timeout seconds if this option is set */
1577 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1578 {
1580 pg_log_error("recovery timed out");
1582 }
1583
1584 /* Keep waiting */
1586
1587 timer += WAIT_INTERVAL;
1588 }
1589
1590 disconnect_database(conn, false);
1591
1592 if (status == POSTMASTER_STILL_STARTING)
1593 pg_fatal("server did not end recovery");
1594
1595 pg_log_info("target server reached the consistent state");
1596 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1597}
1598
1599/*
1600 * Create a publication that includes all tables in the database.
1601 */
1602static void
1604{
1606 PGresult *res;
1607 char *ipubname_esc;
1608 char *spubname_esc;
1609
1610 Assert(conn != NULL);
1611
1612 ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1613 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1614
1615 /* Check if the publication already exists */
1617 "SELECT 1 FROM pg_catalog.pg_publication "
1618 "WHERE pubname = %s",
1619 spubname_esc);
1620 res = PQexec(conn, str->data);
1621 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1622 {
1623 pg_log_error("could not obtain publication information: %s",
1626 }
1627
1628 if (PQntuples(res) == 1)
1629 {
1630 /*
1631 * Unfortunately, if it reaches this code path, it will always fail
1632 * (unless you decide to change the existing publication name). That's
1633 * bad but it is very unlikely that the user will choose a name with
1634 * pg_createsubscriber_ prefix followed by the exact database oid and
1635 * a random number.
1636 */
1637 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1638 pg_log_error_hint("Consider renaming this publication before continuing.");
1640 }
1641
1642 PQclear(res);
1644
1645 pg_log_info("creating publication \"%s\" in database \"%s\"",
1646 dbinfo->pubname, dbinfo->dbname);
1647
1648 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1649 ipubname_esc);
1650
1651 pg_log_debug("command is: %s", str->data);
1652
1653 if (!dry_run)
1654 {
1655 res = PQexec(conn, str->data);
1656 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1657 {
1658 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1659 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1661 }
1662 PQclear(res);
1663 }
1664
1665 /* For cleanup purposes */
1666 dbinfo->made_publication = true;
1667
1668 PQfreemem(ipubname_esc);
1669 PQfreemem(spubname_esc);
1671}
1672
1673/*
1674 * Drop the specified publication in the given database.
1675 */
1676static void
1677drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1678 bool *made_publication)
1679{
1681 PGresult *res;
1682 char *pubname_esc;
1683
1684 Assert(conn != NULL);
1685
1686 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1687
1688 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1689 pubname, dbname);
1690
1691 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1692
1693 PQfreemem(pubname_esc);
1694
1695 pg_log_debug("command is: %s", str->data);
1696
1697 if (!dry_run)
1698 {
1699 res = PQexec(conn, str->data);
1700 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1701 {
1702 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1703 pubname, dbname, PQresultErrorMessage(res));
1704 *made_publication = false; /* don't try again. */
1705
1706 /*
1707 * Don't disconnect and exit here. This routine is used by primary
1708 * (cleanup publication / replication slot due to an error) and
1709 * subscriber (remove the replicated publications). In both cases,
1710 * it can continue and provide instructions for the user to remove
1711 * it later if cleanup fails.
1712 */
1713 }
1714 PQclear(res);
1715 }
1716
1718}
1719
1720/*
1721 * Retrieve and drop the publications.
1722 *
1723 * Since the publications were created before the consistent LSN, they
1724 * remain on the subscriber even after the physical replica is
1725 * promoted. Remove these publications from the subscriber because
1726 * they have no use. Additionally, if requested, drop all pre-existing
1727 * publications.
1728 */
1729static void
1731{
1732 PGresult *res;
1734
1735 Assert(conn != NULL);
1736
1737 if (drop_all_pubs)
1738 {
1739 pg_log_info("dropping all existing publications in database \"%s\"",
1740 dbinfo->dbname);
1741
1742 /* Fetch all publication names */
1743 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1744 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1745 {
1746 pg_log_error("could not obtain publication information: %s",
1748 PQclear(res);
1750 }
1751
1752 /* Drop each publication */
1753 for (int i = 0; i < PQntuples(res); i++)
1754 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1755 &dbinfo->made_publication);
1756
1757 PQclear(res);
1758 }
1759
1760 /*
1761 * In dry-run mode, we don't create publications, but we still try to drop
1762 * those to provide necessary information to the user.
1763 */
1764 if (!drop_all_pubs || dry_run)
1765 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1766 &dbinfo->made_publication);
1767}
1768
1769/*
1770 * Create a subscription with some predefined options.
1771 *
1772 * A replication slot was already created in a previous step. Let's use it. It
1773 * is not required to copy data. The subscription will be created but it will
1774 * not be enabled now. That's because the replication progress must be set and
1775 * the replication origin name (one of the function arguments) contains the
1776 * subscription OID in its name. Once the subscription is created,
1777 * set_replication_progress() can obtain the chosen origin name and set up its
1778 * initial location.
1779 */
1780static void
1782{
1784 PGresult *res;
1785 char *pubname_esc;
1786 char *subname_esc;
1787 char *pubconninfo_esc;
1788 char *replslotname_esc;
1789
1790 Assert(conn != NULL);
1791
1792 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1793 subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1794 pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1795 replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1796
1797 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1798 dbinfo->subname, dbinfo->dbname);
1799
1801 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1802 "WITH (create_slot = false, enabled = false, "
1803 "slot_name = %s, copy_data = false, two_phase = %s)",
1804 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1805 dbinfos.two_phase ? "true" : "false");
1806
1807 PQfreemem(pubname_esc);
1808 PQfreemem(subname_esc);
1809 PQfreemem(pubconninfo_esc);
1810 PQfreemem(replslotname_esc);
1811
1812 pg_log_debug("command is: %s", str->data);
1813
1814 if (!dry_run)
1815 {
1816 res = PQexec(conn, str->data);
1817 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1818 {
1819 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1820 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1822 }
1823 PQclear(res);
1824 }
1825
1827}
1828
1829/*
1830 * Sets the replication progress to the consistent LSN.
1831 *
1832 * The subscriber caught up to the consistent LSN provided by the last
1833 * replication slot that was created. The goal is to set up the initial
1834 * location for the logical replication that is the exact LSN that the
1835 * subscriber was promoted. Once the subscription is enabled it will start
1836 * streaming from that location onwards. In dry run mode, the subscription OID
1837 * and LSN are set to invalid values for printing purposes.
1838 */
1839static void
1840set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1841{
1843 PGresult *res;
1844 Oid suboid;
1845 char *subname;
1846 char *dbname;
1847 char *originname;
1848 char *lsnstr;
1849
1850 Assert(conn != NULL);
1851
1852 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1853 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1854
1856 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1857 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1858 "WHERE s.subname = %s AND d.datname = %s",
1859 subname, dbname);
1860
1861 res = PQexec(conn, str->data);
1862 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1863 {
1864 pg_log_error("could not obtain subscription OID: %s",
1867 }
1868
1869 if (PQntuples(res) != 1 && !dry_run)
1870 {
1871 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1872 PQntuples(res), 1);
1874 }
1875
1876 if (dry_run)
1877 {
1878 suboid = InvalidOid;
1880 }
1881 else
1882 {
1883 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1884 lsnstr = psprintf("%s", lsn);
1885 }
1886
1887 PQclear(res);
1888
1889 /*
1890 * The origin name is defined as pg_%u. %u is the subscription OID. See
1891 * ApplyWorkerMain().
1892 */
1893 originname = psprintf("pg_%u", suboid);
1894
1895 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1896 originname, lsnstr, dbinfo->dbname);
1897
1900 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1901 originname, lsnstr);
1902
1903 pg_log_debug("command is: %s", str->data);
1904
1905 if (!dry_run)
1906 {
1907 res = PQexec(conn, str->data);
1908 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1909 {
1910 pg_log_error("could not set replication progress for subscription \"%s\": %s",
1911 dbinfo->subname, PQresultErrorMessage(res));
1913 }
1914 PQclear(res);
1915 }
1916
1919 pg_free(originname);
1920 pg_free(lsnstr);
1922}
1923
1924/*
1925 * Enables the subscription.
1926 *
1927 * The subscription was created in a previous step but it was disabled. After
1928 * adjusting the initial logical replication location, enable the subscription.
1929 */
1930static void
1932{
1934 PGresult *res;
1935 char *subname;
1936
1937 Assert(conn != NULL);
1938
1939 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1940
1941 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1942 dbinfo->subname, dbinfo->dbname);
1943
1944 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1945
1946 pg_log_debug("command is: %s", str->data);
1947
1948 if (!dry_run)
1949 {
1950 res = PQexec(conn, str->data);
1951 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1952 {
1953 pg_log_error("could not enable subscription \"%s\": %s",
1954 dbinfo->subname, PQresultErrorMessage(res));
1956 }
1957
1958 PQclear(res);
1959 }
1960
1963}
1964
1965/*
1966 * Fetch a list of all connectable non-template databases from the source server
1967 * and form a list such that they appear as if the user has specified multiple
1968 * --database options, one for each source database.
1969 */
1970static void
1972 bool dbnamespecified)
1973{
1974 PGconn *conn;
1975 PGresult *res;
1976
1977 /* If a database name was specified, just connect to it. */
1978 if (dbnamespecified)
1980 else
1981 {
1982 /* Otherwise, try postgres first and then template1. */
1983 char *conninfo;
1984
1985 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
1986 conn = connect_database(conninfo, false);
1987 pg_free(conninfo);
1988 if (!conn)
1989 {
1990 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
1991 conn = connect_database(conninfo, true);
1992 pg_free(conninfo);
1993 }
1994 }
1995
1996 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
1997 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1998 {
1999 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2000 PQclear(res);
2002 }
2003
2004 for (int i = 0; i < PQntuples(res); i++)
2005 {
2006 const char *dbname = PQgetvalue(res, i, 0);
2007
2009
2010 /* Increment num_dbs to reflect multiple --database options */
2011 num_dbs++;
2012 }
2013
2014 PQclear(res);
2015 disconnect_database(conn, false);
2016}
2017
2018int
2019main(int argc, char **argv)
2020{
2021 static struct option long_options[] =
2022 {
2023 {"all", no_argument, NULL, 'a'},
2024 {"database", required_argument, NULL, 'd'},
2025 {"pgdata", required_argument, NULL, 'D'},
2026 {"dry-run", no_argument, NULL, 'n'},
2027 {"subscriber-port", required_argument, NULL, 'p'},
2028 {"publisher-server", required_argument, NULL, 'P'},
2029 {"remove", required_argument, NULL, 'R'},
2030 {"socketdir", required_argument, NULL, 's'},
2031 {"recovery-timeout", required_argument, NULL, 't'},
2032 {"enable-two-phase", no_argument, NULL, 'T'},
2033 {"subscriber-username", required_argument, NULL, 'U'},
2034 {"verbose", no_argument, NULL, 'v'},
2035 {"version", no_argument, NULL, 'V'},
2036 {"help", no_argument, NULL, '?'},
2037 {"config-file", required_argument, NULL, 1},
2038 {"publication", required_argument, NULL, 2},
2039 {"replication-slot", required_argument, NULL, 3},
2040 {"subscription", required_argument, NULL, 4},
2041 {NULL, 0, NULL, 0}
2042 };
2043
2044 struct CreateSubscriberOptions opt = {0};
2045
2046 int c;
2047 int option_index;
2048
2049 char *pub_base_conninfo;
2050 char *sub_base_conninfo;
2051 char *dbname_conninfo = NULL;
2052
2053 uint64 pub_sysid;
2054 uint64 sub_sysid;
2055 struct stat statbuf;
2056
2057 char *consistent_lsn;
2058
2059 char pidfile[MAXPGPATH];
2060
2061 pg_logging_init(argv[0]);
2063 progname = get_progname(argv[0]);
2064 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2065
2066 if (argc > 1)
2067 {
2068 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2069 {
2070 usage();
2071 exit(0);
2072 }
2073 else if (strcmp(argv[1], "-V") == 0
2074 || strcmp(argv[1], "--version") == 0)
2075 {
2076 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2077 exit(0);
2078 }
2079 }
2080
2081 /* Default settings */
2082 subscriber_dir = NULL;
2083 opt.config_file = NULL;
2084 opt.pub_conninfo_str = NULL;
2085 opt.socket_dir = NULL;
2087 opt.sub_username = NULL;
2088 opt.two_phase = false;
2090 {
2091 0
2092 };
2093 opt.recovery_timeout = 0;
2094 opt.all_dbs = false;
2095
2096 /*
2097 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2098 * it either.
2099 */
2100#ifndef WIN32
2101 if (geteuid() == 0)
2102 {
2103 pg_log_error("cannot be executed by \"root\"");
2104 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2105 progname);
2106 exit(1);
2107 }
2108#endif
2109
2111
2112 while ((c = getopt_long(argc, argv, "ad:D:np:P:R:s:t:TU:v",
2113 long_options, &option_index)) != -1)
2114 {
2115 switch (c)
2116 {
2117 case 'a':
2118 opt.all_dbs = true;
2119 break;
2120 case 'd':
2122 {
2124 num_dbs++;
2125 }
2126 else
2127 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2128 break;
2129 case 'D':
2132 break;
2133 case 'n':
2134 dry_run = true;
2135 break;
2136 case 'p':
2137 opt.sub_port = pg_strdup(optarg);
2138 break;
2139 case 'P':
2141 break;
2142 case 'R':
2145 else
2146 pg_fatal("object type \"%s\" is specified more than once for -R/--remove", optarg);
2147 break;
2148 case 's':
2151 break;
2152 case 't':
2153 opt.recovery_timeout = atoi(optarg);
2154 break;
2155 case 'T':
2156 opt.two_phase = true;
2157 break;
2158 case 'U':
2160 break;
2161 case 'v':
2163 break;
2164 case 1:
2166 break;
2167 case 2:
2169 {
2171 num_pubs++;
2172 }
2173 else
2174 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2175 break;
2176 case 3:
2178 {
2180 num_replslots++;
2181 }
2182 else
2183 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2184 break;
2185 case 4:
2187 {
2189 num_subs++;
2190 }
2191 else
2192 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2193 break;
2194 default:
2195 /* getopt_long already emitted a complaint */
2196 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2197 exit(1);
2198 }
2199 }
2200
2201 /* Validate that --all is not used with incompatible options */
2202 if (opt.all_dbs)
2203 {
2204 char *bad_switch = NULL;
2205
2206 if (num_dbs > 0)
2207 bad_switch = "--database";
2208 else if (num_pubs > 0)
2209 bad_switch = "--publication";
2210 else if (num_replslots > 0)
2211 bad_switch = "--replication-slot";
2212 else if (num_subs > 0)
2213 bad_switch = "--subscription";
2214
2215 if (bad_switch)
2216 {
2217 pg_log_error("%s cannot be used with -a/--all", bad_switch);
2218 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2219 exit(1);
2220 }
2221 }
2222
2223 /* Any non-option arguments? */
2224 if (optind < argc)
2225 {
2226 pg_log_error("too many command-line arguments (first is \"%s\")",
2227 argv[optind]);
2228 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2229 exit(1);
2230 }
2231
2232 /* Required arguments */
2233 if (subscriber_dir == NULL)
2234 {
2235 pg_log_error("no subscriber data directory specified");
2236 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2237 exit(1);
2238 }
2239
2240 /* If socket directory is not provided, use the current directory */
2241 if (opt.socket_dir == NULL)
2242 {
2243 char cwd[MAXPGPATH];
2244
2245 if (!getcwd(cwd, MAXPGPATH))
2246 pg_fatal("could not determine current directory");
2247 opt.socket_dir = pg_strdup(cwd);
2249 }
2250
2251 /*
2252 * Parse connection string. Build a base connection string that might be
2253 * reused by multiple databases.
2254 */
2255 if (opt.pub_conninfo_str == NULL)
2256 {
2257 /*
2258 * TODO use primary_conninfo (if available) from subscriber and
2259 * extract publisher connection string. Assume that there are
2260 * identical entries for physical and logical replication. If there is
2261 * not, we would fail anyway.
2262 */
2263 pg_log_error("no publisher connection string specified");
2264 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2265 exit(1);
2266 }
2267 pg_log_info("validating publisher connection string");
2268 pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2269 &dbname_conninfo);
2270 if (pub_base_conninfo == NULL)
2271 exit(1);
2272
2273 pg_log_info("validating subscriber connection string");
2274 sub_base_conninfo = get_sub_conninfo(&opt);
2275
2276 /*
2277 * Fetch all databases from the source (publisher) and treat them as if
2278 * the user specified has multiple --database options, one for each source
2279 * database.
2280 */
2281 if (opt.all_dbs)
2282 {
2283 bool dbnamespecified = (dbname_conninfo != NULL);
2284
2285 get_publisher_databases(&opt, dbnamespecified);
2286 }
2287
2288 if (opt.database_names.head == NULL)
2289 {
2290 pg_log_info("no database was specified");
2291
2292 /*
2293 * Try to obtain the dbname from the publisher conninfo. If dbname
2294 * parameter is not available, error out.
2295 */
2296 if (dbname_conninfo)
2297 {
2298 simple_string_list_append(&opt.database_names, dbname_conninfo);
2299 num_dbs++;
2300
2301 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2302 dbname_conninfo);
2303 }
2304 else
2305 {
2306 pg_log_error("no database name specified");
2307 pg_log_error_hint("Try \"%s --help\" for more information.",
2308 progname);
2309 exit(1);
2310 }
2311 }
2312
2313 /* Number of object names must match number of databases */
2314 if (num_pubs > 0 && num_pubs != num_dbs)
2315 {
2316 pg_log_error("wrong number of publication names specified");
2317 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2318 num_pubs, num_dbs);
2319 exit(1);
2320 }
2321 if (num_subs > 0 && num_subs != num_dbs)
2322 {
2323 pg_log_error("wrong number of subscription names specified");
2324 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2325 num_subs, num_dbs);
2326 exit(1);
2327 }
2328 if (num_replslots > 0 && num_replslots != num_dbs)
2329 {
2330 pg_log_error("wrong number of replication slot names specified");
2331 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2333 exit(1);
2334 }
2335
2336 /* Verify the object types specified for removal from the subscriber */
2337 for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
2338 {
2339 if (pg_strcasecmp(cell->val, "publications") == 0)
2341 else
2342 {
2343 pg_log_error("invalid object type \"%s\" specified for -R/--remove", cell->val);
2344 pg_log_error_hint("The valid option is: \"publications\"");
2345 exit(1);
2346 }
2347 }
2348
2349 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2350 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2351 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2352
2353 /* Rudimentary check for a data directory */
2355
2357
2358 /*
2359 * Store database information for publisher and subscriber. It should be
2360 * called before atexit() because its return is used in the
2361 * cleanup_objects_atexit().
2362 */
2363 dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2364
2365 /* Register a function to clean up objects in case of failure */
2366 atexit(cleanup_objects_atexit);
2367
2368 /*
2369 * Check if the subscriber data directory has the same system identifier
2370 * than the publisher data directory.
2371 */
2373 sub_sysid = get_standby_sysid(subscriber_dir);
2374 if (pub_sysid != sub_sysid)
2375 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2376
2377 /* Subscriber PID file */
2378 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2379
2380 /*
2381 * The standby server must not be running. If the server is started under
2382 * service manager and pg_createsubscriber stops it, the service manager
2383 * might react to this action and start the server again. Therefore,
2384 * refuse to proceed if the server is running to avoid possible failures.
2385 */
2386 if (stat(pidfile, &statbuf) == 0)
2387 {
2388 pg_log_error("standby server is running");
2389 pg_log_error_hint("Stop the standby server and try again.");
2390 exit(1);
2391 }
2392
2393 /*
2394 * Start a short-lived standby server with temporary parameters (provided
2395 * by command-line options). The goal is to avoid connections during the
2396 * transformation steps.
2397 */
2398 pg_log_info("starting the standby server with command-line options");
2399 start_standby_server(&opt, true, false);
2400
2401 /* Check if the standby server is ready for logical replication */
2403
2404 /* Check if the primary server is ready for logical replication */
2406
2407 /*
2408 * Stop the target server. The recovery process requires that the server
2409 * reaches a consistent state before targeting the recovery stop point.
2410 * Make sure a consistent state is reached (stop the target server
2411 * guarantees it) *before* creating the replication slots in
2412 * setup_publisher().
2413 */
2414 pg_log_info("stopping the subscriber");
2416
2417 /* Create the required objects for each database on publisher */
2418 consistent_lsn = setup_publisher(dbinfos.dbinfo);
2419
2420 /* Write the required recovery parameters */
2421 setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2422
2423 /*
2424 * Start subscriber so the recovery parameters will take effect. Wait
2425 * until accepting connections. We don't want to start logical replication
2426 * during setup.
2427 */
2428 pg_log_info("starting the subscriber");
2429 start_standby_server(&opt, true, true);
2430
2431 /* Waiting the subscriber to be promoted */
2433
2434 /*
2435 * Create the subscription for each database on subscriber. It does not
2436 * enable it immediately because it needs to adjust the replication start
2437 * point to the LSN reported by setup_publisher(). It also cleans up
2438 * publications created by this tool and replication to the standby.
2439 */
2440 setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2441
2442 /* Remove primary_slot_name if it exists on primary */
2444
2445 /* Remove failover replication slots if they exist on subscriber */
2447
2448 /* Stop the subscriber */
2449 pg_log_info("stopping the subscriber");
2451
2452 /* Change system identifier from subscriber */
2454
2455 success = true;
2456
2457 pg_log_info("Done!");
2458
2459 return 0;
2460}
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1185
uint32 bits32
Definition: c.h:511
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:160
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:310
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
int errmsg(const char *fmt,...)
Definition: elog.c:1071
#define _(x)
Definition: elog.c:91
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:813
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7434
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6150
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7556
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5290
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7619
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4363
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4369
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:56
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
Assert(PointerIsAligned(start, uint64))
const char * str
long val
Definition: informix.c:689
int i
Definition: isn.c:77
@ CONNECTION_OK
Definition: libpq-fe.h:84
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:176
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_info_hint(...)
Definition: logging.h:130
@ PG_LOG_WARNING
Definition: logging.h:38
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_log_debug(...)
Definition: logging.h:133
#define pg_fatal(...)
static PQExpBuffer recoveryconfcontents
#define MAXPGPATH
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
#define WAIT_INTERVAL
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
int main(int argc, char **argv)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static void usage()
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
@ POSTMASTER_READY
@ POSTMASTER_STILL_STARTING
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
#define USEC_PER_SEC
static char * get_base_conninfo(const char *conninfo, char **dbname)
static struct LogicalRepInfos dbinfos
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool success
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static bool dry_run
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
#define OBJECTTYPE_PUBLICATIONS
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
static char * argv0
Definition: pg_ctl.c:93
static char * exec_path
Definition: pg_ctl.c:88
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89
char * datadir
NameData subname
static char * buf
Definition: pg_test_fsync.c:72
#define pg_log_warning(...)
Definition: pgfnames.c:24
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
void canonicalize_path(char *path)
Definition: path.c:337
#define snprintf
Definition: port.h:239
#define DEVNULL
Definition: port.h:161
const char * get_progname(const char *argv0)
Definition: path.c:652
#define printf(...)
Definition: port.h:245
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define InvalidOid
Definition: postgres_ext.h:35
unsigned int Oid
Definition: postgres_ext.h:30
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:125
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:28
void get_restricted_token(void)
void pg_usleep(long microsec)
Definition: signal.c:53
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
struct SimpleStringList SimpleStringList
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:582
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:698
uint64 system_identifier
Definition: pg_control.h:110
SimpleStringList database_names
SimpleStringList objecttypes_to_remove
SimpleStringList replslot_names
struct LogicalRepInfo * dbinfo
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
#define stat
Definition: win32_port.h:274
#define WIFEXITED(w)
Definition: win32_port.h:150
#define WIFSIGNALED(w)
Definition: win32_port.h:151
#define WTERMSIG(w)
Definition: win32_port.h:153
#define WEXITSTATUS(w)
Definition: win32_port.h:152
int gettimeofday(struct timeval *tp, void *tzp)
int wal_level
Definition: xlog.c:131
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28