PostgreSQL Source Code git master
pg_createsubscriber.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_createsubscriber.c
4 * Create a new logical replica from a standby server
5 *
6 * Copyright (c) 2024-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_createsubscriber.c
10 *
11 *-------------------------------------------------------------------------
12 */
13
14#include "postgres_fe.h"
15
16#include <sys/stat.h>
17#include <sys/time.h>
18#include <sys/wait.h>
19#include <time.h>
20
21#include "common/connect.h"
23#include "common/logging.h"
24#include "common/pg_prng.h"
26#include "datatype/timestamp.h"
30#include "fe_utils/version.h"
31#include "getopt_long.h"
32
33#define DEFAULT_SUB_PORT "50432"
34#define OBJECTTYPE_PUBLICATIONS 0x0001
35
36/* Command-line options */
38{
39 char *config_file; /* configuration file */
40 char *pub_conninfo_str; /* publisher connection string */
41 char *socket_dir; /* directory for Unix-domain socket, if any */
42 char *sub_port; /* subscriber port number */
43 const char *sub_username; /* subscriber username */
44 bool two_phase; /* enable-two-phase option */
45 SimpleStringList database_names; /* list of database names */
46 SimpleStringList pub_names; /* list of publication names */
47 SimpleStringList sub_names; /* list of subscription names */
48 SimpleStringList replslot_names; /* list of replication slot names */
49 int recovery_timeout; /* stop recovery after this time */
50 bool all_dbs; /* all option */
51 SimpleStringList objecttypes_to_clean; /* list of object types to cleanup */
52};
53
54/* per-database publication/subscription info */
56{
57 char *dbname; /* database name */
58 char *pubconninfo; /* publisher connection string */
59 char *subconninfo; /* subscriber connection string */
60 char *pubname; /* publication name */
61 char *subname; /* subscription name */
62 char *replslotname; /* replication slot name */
63
64 bool made_replslot; /* replication slot was created */
65 bool made_publication; /* publication was created */
66};
67
68/*
69 * Information shared across all the databases (or publications and
70 * subscriptions).
71 */
73{
75 bool two_phase; /* enable-two-phase option */
76 bits32 objecttypes_to_clean; /* flags indicating which object types
77 * to clean up on subscriber */
78};
79
80static void cleanup_objects_atexit(void);
81static void usage(void);
82static char *get_base_conninfo(const char *conninfo, char **dbname);
83static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
84static char *get_exec_path(const char *argv0, const char *progname);
85static void check_data_directory(const char *datadir);
86static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
87static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
88 const char *pub_base_conninfo,
89 const char *sub_base_conninfo);
90static PGconn *connect_database(const char *conninfo, bool exit_on_error);
91static void disconnect_database(PGconn *conn, bool exit_on_error);
92static uint64 get_primary_sysid(const char *conninfo);
93static uint64 get_standby_sysid(const char *datadir);
94static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
96static char *generate_object_name(PGconn *conn);
97static void check_publisher(const struct LogicalRepInfo *dbinfo);
98static char *setup_publisher(struct LogicalRepInfo *dbinfo);
99static void check_subscriber(const struct LogicalRepInfo *dbinfo);
100static void setup_subscriber(struct LogicalRepInfo *dbinfo,
101 const char *consistent_lsn);
102static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
103 const char *lsn);
104static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
105 const char *slotname);
106static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
108 struct LogicalRepInfo *dbinfo);
109static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
110 const char *slot_name);
111static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
112static void start_standby_server(const struct CreateSubscriberOptions *opt,
113 bool restricted_access,
114 bool restrict_logical_worker);
115static void stop_standby_server(const char *datadir);
116static void wait_for_end_recovery(const char *conninfo,
117 const struct CreateSubscriberOptions *opt);
118static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
119static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
120static void drop_publication(PGconn *conn, const char *pubname,
121 const char *dbname, bool *made_publication);
122static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
123static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
124static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
125 const char *lsn);
126static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
128 const struct LogicalRepInfo *dbinfo);
129static void drop_existing_subscription(PGconn *conn, const char *subname,
130 const char *dbname);
132 bool dbnamespecified);
133
134#define WAIT_INTERVAL 1 /* 1 second */
135
136static const char *progname;
137
138static char *primary_slot_name = NULL;
139static bool dry_run = false;
140
141static bool success = false;
142
144static int num_dbs = 0; /* number of specified databases */
145static int num_pubs = 0; /* number of specified publications */
146static int num_subs = 0; /* number of specified subscriptions */
147static int num_replslots = 0; /* number of specified replication slots */
148
150
151static char *pg_ctl_path = NULL;
152static char *pg_resetwal_path = NULL;
153
154/* standby / subscriber data directory */
155static char *subscriber_dir = NULL;
156
157static bool recovery_ended = false;
158static bool standby_running = false;
159
160
161/*
162 * Cleanup objects that were created by pg_createsubscriber if there is an
163 * error.
164 *
165 * Publications and replication slots are created on primary. Depending on the
166 * step it failed, it should remove the already created objects if it is
167 * possible (sometimes it won't work due to a connection issue).
168 * There is no cleanup on the target server. The steps on the target server are
169 * executed *after* promotion, hence, at this point, a failure means recreate
170 * the physical replica and start again.
171 */
172static void
174{
175 if (success)
176 return;
177
178 /*
179 * If the server is promoted, there is no way to use the current setup
180 * again. Warn the user that a new replication setup should be done before
181 * trying again.
182 */
183 if (recovery_ended)
184 {
185 pg_log_warning("failed after the end of recovery");
186 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
187 "You must recreate the physical replica before continuing.");
188 }
189
190 for (int i = 0; i < num_dbs; i++)
191 {
192 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
193
194 if (dbinfo->made_publication || dbinfo->made_replslot)
195 {
196 PGconn *conn;
197
198 conn = connect_database(dbinfo->pubconninfo, false);
199 if (conn != NULL)
200 {
201 if (dbinfo->made_publication)
202 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
203 &dbinfo->made_publication);
204 if (dbinfo->made_replslot)
205 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
207 }
208 else
209 {
210 /*
211 * If a connection could not be established, inform the user
212 * that some objects were left on primary and should be
213 * removed before trying again.
214 */
215 if (dbinfo->made_publication)
216 {
217 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
218 dbinfo->pubname,
219 dbinfo->dbname);
220 pg_log_warning_hint("Drop this publication before trying again.");
221 }
222 if (dbinfo->made_replslot)
223 {
224 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
225 dbinfo->replslotname,
226 dbinfo->dbname);
227 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
228 }
229 }
230 }
231 }
232
233 if (standby_running)
235}
236
237static void
238usage(void)
239{
240 printf(_("%s creates a new logical replica from a standby server.\n\n"),
241 progname);
242 printf(_("Usage:\n"));
243 printf(_(" %s [OPTION]...\n"), progname);
244 printf(_("\nOptions:\n"));
245 printf(_(" -a, --all create subscriptions for all databases except template\n"
246 " databases and databases that don't allow connections\n"));
247 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
248 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
249 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
250 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
251 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
252 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
253 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
254 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
255 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
256 printf(_(" -v, --verbose output verbose messages\n"));
257 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
258 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
259 printf(_(" --config-file=FILENAME use specified main server configuration\n"
260 " file when running target cluster\n"));
261 printf(_(" --publication=NAME publication name\n"));
262 printf(_(" --replication-slot=NAME replication slot name\n"));
263 printf(_(" --subscription=NAME subscription name\n"));
264 printf(_(" -V, --version output version information, then exit\n"));
265 printf(_(" -?, --help show this help, then exit\n"));
266 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
267 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
268}
269
270/*
271 * Subroutine to append "keyword=value" to a connection string,
272 * with proper quoting of the value. (We assume keywords don't need that.)
273 */
274static void
275appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
276{
277 if (buf->len > 0)
279 appendPQExpBufferStr(buf, keyword);
282}
283
284/*
285 * Validate a connection string. Returns a base connection string that is a
286 * connection string without a database name.
287 *
288 * Since we might process multiple databases, each database name will be
289 * appended to this base connection string to provide a final connection
290 * string. If the second argument (dbname) is not null, returns dbname if the
291 * provided connection string contains it.
292 *
293 * It is the caller's responsibility to free the returned connection string and
294 * dbname.
295 */
296static char *
297get_base_conninfo(const char *conninfo, char **dbname)
298{
300 PQconninfoOption *conn_opts;
301 PQconninfoOption *conn_opt;
302 char *errmsg = NULL;
303 char *ret;
304
305 conn_opts = PQconninfoParse(conninfo, &errmsg);
306 if (conn_opts == NULL)
307 {
308 pg_log_error("could not parse connection string: %s", errmsg);
310 return NULL;
311 }
312
314 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
315 {
316 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
317 {
318 if (strcmp(conn_opt->keyword, "dbname") == 0)
319 {
320 if (dbname)
321 *dbname = pg_strdup(conn_opt->val);
322 continue;
323 }
324 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
325 }
326 }
327
328 ret = pg_strdup(buf->data);
329
331 PQconninfoFree(conn_opts);
332
333 return ret;
334}
335
336/*
337 * Build a subscriber connection string. Only a few parameters are supported
338 * since it starts a server with restricted access.
339 */
340static char *
342{
344 char *ret;
345
346 appendConnStrItem(buf, "port", opt->sub_port);
347#if !defined(WIN32)
348 appendConnStrItem(buf, "host", opt->socket_dir);
349#endif
350 if (opt->sub_username != NULL)
351 appendConnStrItem(buf, "user", opt->sub_username);
352 appendConnStrItem(buf, "fallback_application_name", progname);
353
354 ret = pg_strdup(buf->data);
355
357
358 return ret;
359}
360
361/*
362 * Verify if a PostgreSQL binary (progname) is available in the same directory as
363 * pg_createsubscriber and it has the same version. It returns the absolute
364 * path of the progname.
365 */
366static char *
367get_exec_path(const char *argv0, const char *progname)
368{
369 char *versionstr;
370 char *exec_path;
371 int ret;
372
373 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
375 ret = find_other_exec(argv0, progname, versionstr, exec_path);
376
377 if (ret < 0)
378 {
379 char full_path[MAXPGPATH];
380
381 if (find_my_exec(argv0, full_path) < 0)
382 strlcpy(full_path, progname, sizeof(full_path));
383
384 if (ret == -1)
385 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
386 progname, "pg_createsubscriber", full_path);
387 else
388 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
389 progname, full_path, "pg_createsubscriber");
390 }
391
392 pg_log_debug("%s path is: %s", progname, exec_path);
393
394 return exec_path;
395}
396
397/*
398 * Is it a cluster directory? These are preliminary checks. It is far from
399 * making an accurate check. If it is not a clone from the publisher, it will
400 * eventually fail in a future step.
401 */
402static void
404{
405 struct stat statbuf;
406 uint32 major_version;
407 char *version_str;
408
409 pg_log_info("checking if directory \"%s\" is a cluster data directory",
410 datadir);
411
412 if (stat(datadir, &statbuf) != 0)
413 {
414 if (errno == ENOENT)
415 pg_fatal("data directory \"%s\" does not exist", datadir);
416 else
417 pg_fatal("could not access directory \"%s\": %m", datadir);
418 }
419
420 /*
421 * Retrieve the contents of this cluster's PG_VERSION. We require
422 * compatibility with the same major version as the one this tool is
423 * compiled with.
424 */
425 major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
426 if (major_version != PG_MAJORVERSION_NUM)
427 {
428 pg_log_error("data directory is of wrong version");
429 pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
430 "PG_VERSION", version_str, PG_MAJORVERSION);
431 exit(1);
432 }
433}
434
435/*
436 * Append database name into a base connection string.
437 *
438 * dbname is the only parameter that changes so it is not included in the base
439 * connection string. This function concatenates dbname to build a "real"
440 * connection string.
441 */
442static char *
443concat_conninfo_dbname(const char *conninfo, const char *dbname)
444{
446 char *ret;
447
448 Assert(conninfo != NULL);
449
450 appendPQExpBufferStr(buf, conninfo);
451 appendConnStrItem(buf, "dbname", dbname);
452
453 ret = pg_strdup(buf->data);
455
456 return ret;
457}
458
459/*
460 * Store publication and subscription information.
461 *
462 * If publication, replication slot and subscription names were specified,
463 * store it here. Otherwise, a generated name will be assigned to the object in
464 * setup_publisher().
465 */
466static struct LogicalRepInfo *
468 const char *pub_base_conninfo,
469 const char *sub_base_conninfo)
470{
471 struct LogicalRepInfo *dbinfo;
472 SimpleStringListCell *pubcell = NULL;
473 SimpleStringListCell *subcell = NULL;
474 SimpleStringListCell *replslotcell = NULL;
475 int i = 0;
476
477 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
478
479 if (num_pubs > 0)
480 pubcell = opt->pub_names.head;
481 if (num_subs > 0)
482 subcell = opt->sub_names.head;
483 if (num_replslots > 0)
484 replslotcell = opt->replslot_names.head;
485
486 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
487 {
488 char *conninfo;
489
490 /* Fill publisher attributes */
491 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
492 dbinfo[i].pubconninfo = conninfo;
493 dbinfo[i].dbname = cell->val;
494 if (num_pubs > 0)
495 dbinfo[i].pubname = pubcell->val;
496 else
497 dbinfo[i].pubname = NULL;
498 if (num_replslots > 0)
499 dbinfo[i].replslotname = replslotcell->val;
500 else
501 dbinfo[i].replslotname = NULL;
502 dbinfo[i].made_replslot = false;
503 dbinfo[i].made_publication = false;
504 /* Fill subscriber attributes */
505 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
506 dbinfo[i].subconninfo = conninfo;
507 if (num_subs > 0)
508 dbinfo[i].subname = subcell->val;
509 else
510 dbinfo[i].subname = NULL;
511 /* Other fields will be filled later */
512
513 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
514 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
515 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
516 dbinfo[i].pubconninfo);
517 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
518 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
519 dbinfo[i].subconninfo,
520 dbinfos.two_phase ? "true" : "false");
521
522 if (num_pubs > 0)
523 pubcell = pubcell->next;
524 if (num_subs > 0)
525 subcell = subcell->next;
526 if (num_replslots > 0)
527 replslotcell = replslotcell->next;
528
529 i++;
530 }
531
532 return dbinfo;
533}
534
535/*
536 * Open a new connection. If exit_on_error is true, it has an undesired
537 * condition and it should exit immediately.
538 */
539static PGconn *
540connect_database(const char *conninfo, bool exit_on_error)
541{
542 PGconn *conn;
543 PGresult *res;
544
545 conn = PQconnectdb(conninfo);
547 {
548 pg_log_error("connection to database failed: %s",
550 PQfinish(conn);
551
552 if (exit_on_error)
553 exit(1);
554 return NULL;
555 }
556
557 /* Secure search_path */
560 {
561 pg_log_error("could not clear \"search_path\": %s",
563 PQclear(res);
564 PQfinish(conn);
565
566 if (exit_on_error)
567 exit(1);
568 return NULL;
569 }
570 PQclear(res);
571
572 return conn;
573}
574
575/*
576 * Close the connection. If exit_on_error is true, it has an undesired
577 * condition and it should exit immediately.
578 */
579static void
580disconnect_database(PGconn *conn, bool exit_on_error)
581{
582 Assert(conn != NULL);
583
584 PQfinish(conn);
585
586 if (exit_on_error)
587 exit(1);
588}
589
590/*
591 * Obtain the system identifier using the provided connection. It will be used
592 * to compare if a data directory is a clone of another one.
593 */
594static uint64
595get_primary_sysid(const char *conninfo)
596{
597 PGconn *conn;
598 PGresult *res;
599 uint64 sysid;
600
601 pg_log_info("getting system identifier from publisher");
602
603 conn = connect_database(conninfo, true);
604
605 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
607 {
608 pg_log_error("could not get system identifier: %s",
611 }
612 if (PQntuples(res) != 1)
613 {
614 pg_log_error("could not get system identifier: got %d rows, expected %d row",
615 PQntuples(res), 1);
617 }
618
619 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
620
621 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
622
623 PQclear(res);
625
626 return sysid;
627}
628
629/*
630 * Obtain the system identifier from control file. It will be used to compare
631 * if a data directory is a clone of another one. This routine is used locally
632 * and avoids a connection.
633 */
634static uint64
636{
637 ControlFileData *cf;
638 bool crc_ok;
639 uint64 sysid;
640
641 pg_log_info("getting system identifier from subscriber");
642
643 cf = get_controlfile(datadir, &crc_ok);
644 if (!crc_ok)
645 pg_fatal("control file appears to be corrupt");
646
647 sysid = cf->system_identifier;
648
649 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
650
651 pg_free(cf);
652
653 return sysid;
654}
655
656/*
657 * Modify the system identifier. Since a standby server preserves the system
658 * identifier, it makes sense to change it to avoid situations in which WAL
659 * files from one of the systems might be used in the other one.
660 */
661static void
663{
664 ControlFileData *cf;
665 bool crc_ok;
666 struct timeval tv;
667
668 char *cmd_str;
669
670 pg_log_info("modifying system identifier of subscriber");
671
672 cf = get_controlfile(subscriber_dir, &crc_ok);
673 if (!crc_ok)
674 pg_fatal("control file appears to be corrupt");
675
676 /*
677 * Select a new system identifier.
678 *
679 * XXX this code was extracted from BootStrapXLOG().
680 */
681 gettimeofday(&tv, NULL);
682 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
683 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
684 cf->system_identifier |= getpid() & 0xFFF;
685
686 if (dry_run)
687 pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
689 else
690 {
692 pg_log_info("system identifier is %" PRIu64 " on subscriber",
694 }
695
696 if (dry_run)
697 pg_log_info("dry-run: would run pg_resetwal on the subscriber");
698 else
699 pg_log_info("running pg_resetwal on the subscriber");
700
701 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
703
704 pg_log_debug("pg_resetwal command is: %s", cmd_str);
705
706 if (!dry_run)
707 {
708 int rc = system(cmd_str);
709
710 if (rc == 0)
711 pg_log_info("successfully reset WAL on the subscriber");
712 else
713 pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
714 }
715
716 pg_free(cf);
717}
718
719/*
720 * Generate an object name using a prefix, database oid and a random integer.
721 * It is used in case the user does not specify an object name (publication,
722 * subscription, replication slot).
723 */
724static char *
726{
727 PGresult *res;
728 Oid oid;
729 uint32 rand;
730 char *objname;
731
732 res = PQexec(conn,
733 "SELECT oid FROM pg_catalog.pg_database "
734 "WHERE datname = pg_catalog.current_database()");
736 {
737 pg_log_error("could not obtain database OID: %s",
740 }
741
742 if (PQntuples(res) != 1)
743 {
744 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
745 PQntuples(res), 1);
747 }
748
749 /* Database OID */
750 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
751
752 PQclear(res);
753
754 /* Random unsigned integer */
755 rand = pg_prng_uint32(&prng_state);
756
757 /*
758 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
759 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
760 * '\0').
761 */
762 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
763
764 return objname;
765}
766
767/*
768 * Does the publication exist in the specified database?
769 */
770static bool
771find_publication(PGconn *conn, const char *pubname, const char *dbname)
772{
774 PGresult *res;
775 bool found = false;
776 char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
777
779 "SELECT 1 FROM pg_catalog.pg_publication "
780 "WHERE pubname = %s",
781 pubname_esc);
782 res = PQexec(conn, str->data);
784 {
785 pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
786 pubname, dbname, PQerrorMessage(conn));
788 }
789
790 if (PQntuples(res) == 1)
791 found = true;
792
793 PQclear(res);
794 PQfreemem(pubname_esc);
796
797 return found;
798}
799
800/*
801 * Create the publications and replication slots in preparation for logical
802 * replication. Returns the LSN from latest replication slot. It will be the
803 * replication start point that is used to adjust the subscriptions (see
804 * set_replication_progress).
805 */
806static char *
808{
809 char *lsn = NULL;
810
811 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
812
813 for (int i = 0; i < num_dbs; i++)
814 {
815 PGconn *conn;
816 char *genname = NULL;
817
818 conn = connect_database(dbinfo[i].pubconninfo, true);
819
820 /*
821 * If an object name was not specified as command-line options, assign
822 * a generated object name. The replication slot has a different rule.
823 * The subscription name is assigned to the replication slot name if
824 * no replication slot is specified. It follows the same rule as
825 * CREATE SUBSCRIPTION.
826 */
827 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
828 genname = generate_object_name(conn);
829 if (num_pubs == 0)
830 dbinfo[i].pubname = pg_strdup(genname);
831 if (num_subs == 0)
832 dbinfo[i].subname = pg_strdup(genname);
833 if (num_replslots == 0)
834 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
835
836 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
837 {
838 /* Reuse existing publication on publisher. */
839 pg_log_info("use existing publication \"%s\" in database \"%s\"",
840 dbinfo[i].pubname, dbinfo[i].dbname);
841 /* Don't remove pre-existing publication if an error occurs. */
842 dbinfo[i].made_publication = false;
843 }
844 else
845 {
846 /*
847 * Create publication on publisher. This step should be executed
848 * *before* promoting the subscriber to avoid any transactions
849 * between consistent LSN and the new publication rows (such
850 * transactions wouldn't see the new publication rows resulting in
851 * an error).
852 */
853 create_publication(conn, &dbinfo[i]);
854 }
855
856 /* Create replication slot on publisher */
857 if (lsn)
858 pg_free(lsn);
859 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
860 if (lsn == NULL && !dry_run)
861 exit(1);
862
863 /*
864 * Since we are using the LSN returned by the last replication slot as
865 * recovery_target_lsn, this LSN is ahead of the current WAL position
866 * and the recovery waits until the publisher writes a WAL record to
867 * reach the target and ends the recovery. On idle systems, this wait
868 * time is unpredictable and could lead to failure in promoting the
869 * subscriber. To avoid that, insert a harmless WAL record.
870 */
871 if (i == num_dbs - 1 && !dry_run)
872 {
873 PGresult *res;
874
875 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
877 {
878 pg_log_error("could not write an additional WAL record: %s",
881 }
882 PQclear(res);
883 }
884
886 }
887
888 return lsn;
889}
890
891/*
892 * Is recovery still in progress?
893 */
894static bool
896{
897 PGresult *res;
898 int ret;
899
900 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
901
903 {
904 pg_log_error("could not obtain recovery progress: %s",
907 }
908
909
910 ret = strcmp("t", PQgetvalue(res, 0, 0));
911
912 PQclear(res);
913
914 return ret == 0;
915}
916
917/*
918 * Is the primary server ready for logical replication?
919 *
920 * XXX Does it not allow a synchronous replica?
921 */
922static void
923check_publisher(const struct LogicalRepInfo *dbinfo)
924{
925 PGconn *conn;
926 PGresult *res;
927 bool failed = false;
928
929 char *wal_level;
930 int max_repslots;
931 int cur_repslots;
932 int max_walsenders;
933 int cur_walsenders;
934 int max_prepared_transactions;
935 char *max_slot_wal_keep_size;
936
937 pg_log_info("checking settings on publisher");
938
939 conn = connect_database(dbinfo[0].pubconninfo, true);
940
941 /*
942 * If the primary server is in recovery (i.e. cascading replication),
943 * objects (publication) cannot be created because it is read only.
944 */
946 {
947 pg_log_error("primary server cannot be in recovery");
949 }
950
951 /*------------------------------------------------------------------------
952 * Logical replication requires a few parameters to be set on publisher.
953 * Since these parameters are not a requirement for physical replication,
954 * we should check it to make sure it won't fail.
955 *
956 * - wal_level >= replica
957 * - max_replication_slots >= current + number of dbs to be converted
958 * - max_wal_senders >= current + number of dbs to be converted
959 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
960 * -----------------------------------------------------------------------
961 */
962 res = PQexec(conn,
963 "SELECT pg_catalog.current_setting('wal_level'),"
964 " pg_catalog.current_setting('max_replication_slots'),"
965 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
966 " pg_catalog.current_setting('max_wal_senders'),"
967 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
968 " pg_catalog.current_setting('max_prepared_transactions'),"
969 " pg_catalog.current_setting('max_slot_wal_keep_size')");
970
972 {
973 pg_log_error("could not obtain publisher settings: %s",
976 }
977
978 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
979 max_repslots = atoi(PQgetvalue(res, 0, 1));
980 cur_repslots = atoi(PQgetvalue(res, 0, 2));
981 max_walsenders = atoi(PQgetvalue(res, 0, 3));
982 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
983 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
984 max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
985
986 PQclear(res);
987
988 pg_log_debug("publisher: wal_level: %s", wal_level);
989 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
990 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
991 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
992 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
993 pg_log_debug("publisher: max_prepared_transactions: %d",
994 max_prepared_transactions);
995 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
996 max_slot_wal_keep_size);
997
999
1000 if (strcmp(wal_level, "minimal") == 0)
1001 {
1002 pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
1003 failed = true;
1004 }
1005
1006 if (max_repslots - cur_repslots < num_dbs)
1007 {
1008 pg_log_error("publisher requires %d replication slots, but only %d remain",
1009 num_dbs, max_repslots - cur_repslots);
1010 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1011 "max_replication_slots", cur_repslots + num_dbs);
1012 failed = true;
1013 }
1014
1015 if (max_walsenders - cur_walsenders < num_dbs)
1016 {
1017 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1018 num_dbs, max_walsenders - cur_walsenders);
1019 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1020 "max_wal_senders", cur_walsenders + num_dbs);
1021 failed = true;
1022 }
1023
1024 if (max_prepared_transactions != 0 && !dbinfos.two_phase)
1025 {
1026 pg_log_warning("two_phase option will not be enabled for replication slots");
1027 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1028 "Prepared transactions will be replicated at COMMIT PREPARED.");
1029 pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1030 }
1031
1032 /*
1033 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1034 * is set to a non-default value, it may cause replication failures due to
1035 * required WAL files being prematurely removed.
1036 */
1037 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1038 {
1039 pg_log_warning("required WAL could be removed from the publisher");
1040 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1041 "max_slot_wal_keep_size");
1042 }
1043
1045
1046 if (failed)
1047 exit(1);
1048}
1049
1050/*
1051 * Is the standby server ready for logical replication?
1052 *
1053 * XXX Does it not allow a time-delayed replica?
1054 *
1055 * XXX In a cascaded replication scenario (P -> S -> C), if the target server
1056 * is S, it cannot detect there is a replica (server C) because server S starts
1057 * accepting only local connections and server C cannot connect to it. Hence,
1058 * there is not a reliable way to provide a suitable error saying the server C
1059 * will be broken at the end of this process (due to pg_resetwal).
1060 */
1061static void
1063{
1064 PGconn *conn;
1065 PGresult *res;
1066 bool failed = false;
1067
1068 int max_lrworkers;
1069 int max_reporigins;
1070 int max_wprocs;
1071
1072 pg_log_info("checking settings on subscriber");
1073
1074 conn = connect_database(dbinfo[0].subconninfo, true);
1075
1076 /* The target server must be a standby */
1078 {
1079 pg_log_error("target server must be a standby");
1081 }
1082
1083 /*------------------------------------------------------------------------
1084 * Logical replication requires a few parameters to be set on subscriber.
1085 * Since these parameters are not a requirement for physical replication,
1086 * we should check it to make sure it won't fail.
1087 *
1088 * - max_active_replication_origins >= number of dbs to be converted
1089 * - max_logical_replication_workers >= number of dbs to be converted
1090 * - max_worker_processes >= 1 + number of dbs to be converted
1091 *------------------------------------------------------------------------
1092 */
1093 res = PQexec(conn,
1094 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1095 "'max_logical_replication_workers', "
1096 "'max_active_replication_origins', "
1097 "'max_worker_processes', "
1098 "'primary_slot_name') "
1099 "ORDER BY name");
1100
1101 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1102 {
1103 pg_log_error("could not obtain subscriber settings: %s",
1106 }
1107
1108 max_reporigins = atoi(PQgetvalue(res, 0, 0));
1109 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1110 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1111 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1113
1114 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1115 max_lrworkers);
1116 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1117 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1119 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1120
1121 PQclear(res);
1122
1123 disconnect_database(conn, false);
1124
1125 if (max_reporigins < num_dbs)
1126 {
1127 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1128 num_dbs, max_reporigins);
1129 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1130 "max_active_replication_origins", num_dbs);
1131 failed = true;
1132 }
1133
1134 if (max_lrworkers < num_dbs)
1135 {
1136 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1137 num_dbs, max_lrworkers);
1138 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1139 "max_logical_replication_workers", num_dbs);
1140 failed = true;
1141 }
1142
1143 if (max_wprocs < num_dbs + 1)
1144 {
1145 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1146 num_dbs + 1, max_wprocs);
1147 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1148 "max_worker_processes", num_dbs + 1);
1149 failed = true;
1150 }
1151
1152 if (failed)
1153 exit(1);
1154}
1155
1156/*
1157 * Drop a specified subscription. This is to avoid duplicate subscriptions on
1158 * the primary (publisher node) and the newly created subscriber. We
1159 * shouldn't drop the associated slot as that would be used by the publisher
1160 * node.
1161 */
1162static void
1164{
1166 PGresult *res;
1167
1168 Assert(conn != NULL);
1169
1170 /*
1171 * Construct a query string. These commands are allowed to be executed
1172 * within a transaction.
1173 */
1174 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1175 subname);
1176 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1177 subname);
1178 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1179
1180 if (dry_run)
1181 pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1182 subname, dbname);
1183 else
1184 {
1185 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1186 subname, dbname);
1187
1188 res = PQexec(conn, query->data);
1189
1190 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1191 {
1192 pg_log_error("could not drop subscription \"%s\": %s",
1195 }
1196
1197 PQclear(res);
1198 }
1199
1200 destroyPQExpBuffer(query);
1201}
1202
1203/*
1204 * Retrieve and drop the pre-existing subscriptions.
1205 */
1206static void
1208 const struct LogicalRepInfo *dbinfo)
1209{
1211 char *dbname;
1212 PGresult *res;
1213
1214 Assert(conn != NULL);
1215
1216 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1217
1218 appendPQExpBuffer(query,
1219 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1220 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1221 "WHERE d.datname = %s",
1222 dbname);
1223 res = PQexec(conn, query->data);
1224
1225 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1226 {
1227 pg_log_error("could not obtain pre-existing subscriptions: %s",
1230 }
1231
1232 for (int i = 0; i < PQntuples(res); i++)
1234 dbinfo->dbname);
1235
1236 PQclear(res);
1237 destroyPQExpBuffer(query);
1239}
1240
1241/*
1242 * Create the subscriptions, adjust the initial location for logical
1243 * replication and enable the subscriptions. That's the last step for logical
1244 * replication setup.
1245 */
1246static void
1247setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1248{
1249 for (int i = 0; i < num_dbs; i++)
1250 {
1251 PGconn *conn;
1252
1253 /* Connect to subscriber. */
1254 conn = connect_database(dbinfo[i].subconninfo, true);
1255
1256 /*
1257 * We don't need the pre-existing subscriptions on the newly formed
1258 * subscriber. They can connect to other publisher nodes and either
1259 * get some unwarranted data or can lead to ERRORs in connecting to
1260 * such nodes.
1261 */
1263
1264 /* Check and drop the required publications in the given database. */
1266
1267 create_subscription(conn, &dbinfo[i]);
1268
1269 /* Set the replication progress to the correct LSN */
1270 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1271
1272 /* Enable subscription */
1273 enable_subscription(conn, &dbinfo[i]);
1274
1275 disconnect_database(conn, false);
1276 }
1277}
1278
1279/*
1280 * Write the required recovery parameters.
1281 */
1282static void
1283setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1284{
1285 PGconn *conn;
1287
1288 /*
1289 * Despite of the recovery parameters will be written to the subscriber,
1290 * use a publisher connection. The primary_conninfo is generated using the
1291 * connection settings.
1292 */
1293 conn = connect_database(dbinfo[0].pubconninfo, true);
1294
1295 /*
1296 * Write recovery parameters.
1297 *
1298 * The subscriber is not running yet. In dry run mode, the recovery
1299 * parameters *won't* be written. An invalid LSN is used for printing
1300 * purposes. Additional recovery parameters are added here. It avoids
1301 * unexpected behavior such as end of recovery as soon as a consistent
1302 * state is reached (recovery_target) and failure due to multiple recovery
1303 * targets (name, time, xid, LSN).
1304 */
1306 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1308 "recovery_target_timeline = 'latest'\n");
1309
1310 /*
1311 * Set recovery_target_inclusive = false to avoid reapplying the
1312 * transaction committed at 'lsn' after subscription is enabled. This is
1313 * because the provided 'lsn' is also used as the replication start point
1314 * for the subscription. So, the server can send the transaction committed
1315 * at that 'lsn' after replication is started which can lead to applying
1316 * the same transaction twice if we keep recovery_target_inclusive = true.
1317 */
1319 "recovery_target_inclusive = false\n");
1321 "recovery_target_action = promote\n");
1322 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1323 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1324 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1325
1326 if (dry_run)
1327 {
1328 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1330 "recovery_target_lsn = '%X/%08X'\n",
1332 }
1333 else
1334 {
1335 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1336 lsn);
1338 }
1339 disconnect_database(conn, false);
1340
1341 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1342}
1343
1344/*
1345 * Drop physical replication slot on primary if the standby was using it. After
1346 * the transformation, it has no use.
1347 *
1348 * XXX we might not fail here. Instead, we provide a warning so the user
1349 * eventually drops this replication slot later.
1350 */
1351static void
1352drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1353{
1354 PGconn *conn;
1355
1356 /* Replication slot does not exist, do nothing */
1357 if (!primary_slot_name)
1358 return;
1359
1360 conn = connect_database(dbinfo[0].pubconninfo, false);
1361 if (conn != NULL)
1362 {
1363 drop_replication_slot(conn, &dbinfo[0], slotname);
1364 disconnect_database(conn, false);
1365 }
1366 else
1367 {
1368 pg_log_warning("could not drop replication slot \"%s\" on primary",
1369 slotname);
1370 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1371 }
1372}
1373
1374/*
1375 * Drop failover replication slots on subscriber. After the transformation,
1376 * they have no use.
1377 *
1378 * XXX We do not fail here. Instead, we provide a warning so the user can drop
1379 * them later.
1380 */
1381static void
1383{
1384 PGconn *conn;
1385 PGresult *res;
1386
1387 conn = connect_database(dbinfo[0].subconninfo, false);
1388 if (conn != NULL)
1389 {
1390 /* Get failover replication slot names */
1391 res = PQexec(conn,
1392 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1393
1394 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1395 {
1396 /* Remove failover replication slots from subscriber */
1397 for (int i = 0; i < PQntuples(res); i++)
1398 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1399 }
1400 else
1401 {
1402 pg_log_warning("could not obtain failover replication slot information: %s",
1404 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1405 }
1406
1407 PQclear(res);
1408 disconnect_database(conn, false);
1409 }
1410 else
1411 {
1412 pg_log_warning("could not drop failover replication slot");
1413 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1414 }
1415}
1416
1417/*
1418 * Create a logical replication slot and returns a LSN.
1419 *
1420 * CreateReplicationSlot() is not used because it does not provide the one-row
1421 * result set that contains the LSN.
1422 */
1423static char *
1425{
1427 PGresult *res = NULL;
1428 const char *slot_name = dbinfo->replslotname;
1429 char *slot_name_esc;
1430 char *lsn = NULL;
1431
1432 Assert(conn != NULL);
1433
1434 if (dry_run)
1435 pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1436 slot_name, dbinfo->dbname);
1437 else
1438 pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1439 slot_name, dbinfo->dbname);
1440
1441 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1442
1444 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1445 slot_name_esc,
1446 dbinfos.two_phase ? "true" : "false");
1447
1448 PQfreemem(slot_name_esc);
1449
1450 pg_log_debug("command is: %s", str->data);
1451
1452 if (!dry_run)
1453 {
1454 res = PQexec(conn, str->data);
1455 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1456 {
1457 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1458 slot_name, dbinfo->dbname,
1460 PQclear(res);
1462 return NULL;
1463 }
1464
1465 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1466 PQclear(res);
1467 }
1468
1469 /* For cleanup purposes */
1470 dbinfo->made_replslot = true;
1471
1473
1474 return lsn;
1475}
1476
1477static void
1479 const char *slot_name)
1480{
1482 char *slot_name_esc;
1483 PGresult *res;
1484
1485 Assert(conn != NULL);
1486
1487 if (dry_run)
1488 pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1489 slot_name, dbinfo->dbname);
1490 else
1491 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1492 slot_name, dbinfo->dbname);
1493
1494 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1495
1496 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1497
1498 PQfreemem(slot_name_esc);
1499
1500 pg_log_debug("command is: %s", str->data);
1501
1502 if (!dry_run)
1503 {
1504 res = PQexec(conn, str->data);
1505 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1506 {
1507 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1508 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1509 dbinfo->made_replslot = false; /* don't try again. */
1510 }
1511
1512 PQclear(res);
1513 }
1514
1516}
1517
1518/*
1519 * Reports a suitable message if pg_ctl fails.
1520 */
1521static void
1522pg_ctl_status(const char *pg_ctl_cmd, int rc)
1523{
1524 if (rc != 0)
1525 {
1526 if (WIFEXITED(rc))
1527 {
1528 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1529 }
1530 else if (WIFSIGNALED(rc))
1531 {
1532#if defined(WIN32)
1533 pg_log_error("pg_ctl was terminated by exception 0x%X",
1534 WTERMSIG(rc));
1535 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1536#else
1537 pg_log_error("pg_ctl was terminated by signal %d: %s",
1538 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1539#endif
1540 }
1541 else
1542 {
1543 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1544 }
1545
1546 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1547 exit(1);
1548 }
1549}
1550
1551static void
1552start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1553 bool restrict_logical_worker)
1554{
1555 PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1556 int rc;
1557
1558 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1559 appendShellString(pg_ctl_cmd, subscriber_dir);
1560 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1561
1562 /* Prevent unintended slot invalidation */
1563 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1564
1565 if (restricted_access)
1566 {
1567 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1568#if !defined(WIN32)
1569
1570 /*
1571 * An empty listen_addresses list means the server does not listen on
1572 * any IP interfaces; only Unix-domain sockets can be used to connect
1573 * to the server. Prevent external connections to minimize the chance
1574 * of failure.
1575 */
1576 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1577 if (opt->socket_dir)
1578 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1579 opt->socket_dir);
1580 appendPQExpBufferChar(pg_ctl_cmd, '"');
1581#endif
1582 }
1583 if (opt->config_file != NULL)
1584 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1585 opt->config_file);
1586
1587 /* Suppress to start logical replication if requested */
1588 if (restrict_logical_worker)
1589 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1590
1591 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1592 rc = system(pg_ctl_cmd->data);
1593 pg_ctl_status(pg_ctl_cmd->data, rc);
1594 standby_running = true;
1595 destroyPQExpBuffer(pg_ctl_cmd);
1596 pg_log_info("server was started");
1597}
1598
1599static void
1601{
1602 char *pg_ctl_cmd;
1603 int rc;
1604
1605 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1606 datadir);
1607 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1608 rc = system(pg_ctl_cmd);
1609 pg_ctl_status(pg_ctl_cmd, rc);
1610 standby_running = false;
1611 pg_log_info("server was stopped");
1612}
1613
1614/*
1615 * Returns after the server finishes the recovery process.
1616 *
1617 * If recovery_timeout option is set, terminate abnormally without finishing
1618 * the recovery process. By default, it waits forever.
1619 *
1620 * XXX Is the recovery process still in progress? When recovery process has a
1621 * better progress reporting mechanism, it should be added here.
1622 */
1623static void
1624wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1625{
1626 PGconn *conn;
1627 bool ready = false;
1628 int timer = 0;
1629
1630 pg_log_info("waiting for the target server to reach the consistent state");
1631
1632 conn = connect_database(conninfo, true);
1633
1634 for (;;)
1635 {
1636 /* Did the recovery process finish? We're done if so. */
1638 {
1639 ready = true;
1640 recovery_ended = true;
1641 break;
1642 }
1643
1644 /* Bail out after recovery_timeout seconds if this option is set */
1645 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1646 {
1648 pg_log_error("recovery timed out");
1650 }
1651
1652 /* Keep waiting */
1654 timer += WAIT_INTERVAL;
1655 }
1656
1657 disconnect_database(conn, false);
1658
1659 if (!ready)
1660 pg_fatal("server did not end recovery");
1661
1662 pg_log_info("target server reached the consistent state");
1663 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1664}
1665
1666/*
1667 * Create a publication that includes all tables in the database.
1668 */
1669static void
1671{
1673 PGresult *res;
1674 char *ipubname_esc;
1675 char *spubname_esc;
1676
1677 Assert(conn != NULL);
1678
1679 ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1680 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1681
1682 /* Check if the publication already exists */
1684 "SELECT 1 FROM pg_catalog.pg_publication "
1685 "WHERE pubname = %s",
1686 spubname_esc);
1687 res = PQexec(conn, str->data);
1688 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1689 {
1690 pg_log_error("could not obtain publication information: %s",
1693 }
1694
1695 if (PQntuples(res) == 1)
1696 {
1697 /*
1698 * Unfortunately, if it reaches this code path, it will always fail
1699 * (unless you decide to change the existing publication name). That's
1700 * bad but it is very unlikely that the user will choose a name with
1701 * pg_createsubscriber_ prefix followed by the exact database oid and
1702 * a random number.
1703 */
1704 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1705 pg_log_error_hint("Consider renaming this publication before continuing.");
1707 }
1708
1709 PQclear(res);
1711
1712 if (dry_run)
1713 pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1714 dbinfo->pubname, dbinfo->dbname);
1715 else
1716 pg_log_info("creating publication \"%s\" in database \"%s\"",
1717 dbinfo->pubname, dbinfo->dbname);
1718
1719 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1720 ipubname_esc);
1721
1722 pg_log_debug("command is: %s", str->data);
1723
1724 if (!dry_run)
1725 {
1726 res = PQexec(conn, str->data);
1727 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1728 {
1729 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1730 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1732 }
1733 PQclear(res);
1734 }
1735
1736 /* For cleanup purposes */
1737 dbinfo->made_publication = true;
1738
1739 PQfreemem(ipubname_esc);
1740 PQfreemem(spubname_esc);
1742}
1743
1744/*
1745 * Drop the specified publication in the given database.
1746 */
1747static void
1748drop_publication(PGconn *conn, const char *pubname, const char *dbname,
1749 bool *made_publication)
1750{
1752 PGresult *res;
1753 char *pubname_esc;
1754
1755 Assert(conn != NULL);
1756
1757 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1758
1759 if (dry_run)
1760 pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1761 pubname, dbname);
1762 else
1763 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1764 pubname, dbname);
1765
1766 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1767
1768 PQfreemem(pubname_esc);
1769
1770 pg_log_debug("command is: %s", str->data);
1771
1772 if (!dry_run)
1773 {
1774 res = PQexec(conn, str->data);
1775 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1776 {
1777 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1778 pubname, dbname, PQresultErrorMessage(res));
1779 *made_publication = false; /* don't try again. */
1780
1781 /*
1782 * Don't disconnect and exit here. This routine is used by primary
1783 * (cleanup publication / replication slot due to an error) and
1784 * subscriber (remove the replicated publications). In both cases,
1785 * it can continue and provide instructions for the user to remove
1786 * it later if cleanup fails.
1787 */
1788 }
1789 PQclear(res);
1790 }
1791
1793}
1794
1795/*
1796 * Retrieve and drop the publications.
1797 *
1798 * Publications copied during physical replication remain on the subscriber
1799 * after promotion. If --clean=publications is specified, drop all existing
1800 * publications in the subscriber database. Otherwise, only drop publications
1801 * that were created by pg_createsubscriber during this operation.
1802 */
1803static void
1805{
1806 PGresult *res;
1808
1809 Assert(conn != NULL);
1810
1811 if (drop_all_pubs)
1812 {
1813 pg_log_info("dropping all existing publications in database \"%s\"",
1814 dbinfo->dbname);
1815
1816 /* Fetch all publication names */
1817 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1818 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1819 {
1820 pg_log_error("could not obtain publication information: %s",
1822 PQclear(res);
1824 }
1825
1826 /* Drop each publication */
1827 for (int i = 0; i < PQntuples(res); i++)
1828 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1829 &dbinfo->made_publication);
1830
1831 PQclear(res);
1832 }
1833 else
1834 {
1835 /* Drop publication only if it was created by this tool */
1836 if (dbinfo->made_publication)
1837 {
1838 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1839 &dbinfo->made_publication);
1840 }
1841 else
1842 {
1843 if (dry_run)
1844 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1845 dbinfo->pubname, dbinfo->dbname);
1846 else
1847 pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
1848 dbinfo->pubname, dbinfo->dbname);
1849 }
1850 }
1851}
1852
1853/*
1854 * Create a subscription with some predefined options.
1855 *
1856 * A replication slot was already created in a previous step. Let's use it. It
1857 * is not required to copy data. The subscription will be created but it will
1858 * not be enabled now. That's because the replication progress must be set and
1859 * the replication origin name (one of the function arguments) contains the
1860 * subscription OID in its name. Once the subscription is created,
1861 * set_replication_progress() can obtain the chosen origin name and set up its
1862 * initial location.
1863 */
1864static void
1866{
1868 PGresult *res;
1869 char *pubname_esc;
1870 char *subname_esc;
1871 char *pubconninfo_esc;
1872 char *replslotname_esc;
1873
1874 Assert(conn != NULL);
1875
1876 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1877 subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1878 pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1879 replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1880
1881 if (dry_run)
1882 pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
1883 dbinfo->subname, dbinfo->dbname);
1884 else
1885 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1886 dbinfo->subname, dbinfo->dbname);
1887
1889 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1890 "WITH (create_slot = false, enabled = false, "
1891 "slot_name = %s, copy_data = false, two_phase = %s)",
1892 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1893 dbinfos.two_phase ? "true" : "false");
1894
1895 PQfreemem(pubname_esc);
1896 PQfreemem(subname_esc);
1897 PQfreemem(pubconninfo_esc);
1898 PQfreemem(replslotname_esc);
1899
1900 pg_log_debug("command is: %s", str->data);
1901
1902 if (!dry_run)
1903 {
1904 res = PQexec(conn, str->data);
1905 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1906 {
1907 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1908 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1910 }
1911 PQclear(res);
1912 }
1913
1915}
1916
1917/*
1918 * Sets the replication progress to the consistent LSN.
1919 *
1920 * The subscriber caught up to the consistent LSN provided by the last
1921 * replication slot that was created. The goal is to set up the initial
1922 * location for the logical replication that is the exact LSN that the
1923 * subscriber was promoted. Once the subscription is enabled it will start
1924 * streaming from that location onwards. In dry run mode, the subscription OID
1925 * and LSN are set to invalid values for printing purposes.
1926 */
1927static void
1928set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1929{
1931 PGresult *res;
1932 Oid suboid;
1933 char *subname;
1934 char *dbname;
1935 char *originname;
1936 char *lsnstr;
1937
1938 Assert(conn != NULL);
1939
1940 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1941 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1942
1944 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1945 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1946 "WHERE s.subname = %s AND d.datname = %s",
1947 subname, dbname);
1948
1949 res = PQexec(conn, str->data);
1950 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1951 {
1952 pg_log_error("could not obtain subscription OID: %s",
1955 }
1956
1957 if (PQntuples(res) != 1 && !dry_run)
1958 {
1959 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1960 PQntuples(res), 1);
1962 }
1963
1964 if (dry_run)
1965 {
1966 suboid = InvalidOid;
1967 lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1968 }
1969 else
1970 {
1971 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1972 lsnstr = psprintf("%s", lsn);
1973 }
1974
1975 PQclear(res);
1976
1977 /*
1978 * The origin name is defined as pg_%u. %u is the subscription OID. See
1979 * ApplyWorkerMain().
1980 */
1981 originname = psprintf("pg_%u", suboid);
1982
1983 if (dry_run)
1984 pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1985 originname, lsnstr, dbinfo->dbname);
1986 else
1987 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1988 originname, lsnstr, dbinfo->dbname);
1989
1992 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1993 originname, lsnstr);
1994
1995 pg_log_debug("command is: %s", str->data);
1996
1997 if (!dry_run)
1998 {
1999 res = PQexec(conn, str->data);
2000 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2001 {
2002 pg_log_error("could not set replication progress for subscription \"%s\": %s",
2003 dbinfo->subname, PQresultErrorMessage(res));
2005 }
2006 PQclear(res);
2007 }
2008
2011 pg_free(originname);
2012 pg_free(lsnstr);
2014}
2015
2016/*
2017 * Enables the subscription.
2018 *
2019 * The subscription was created in a previous step but it was disabled. After
2020 * adjusting the initial logical replication location, enable the subscription.
2021 */
2022static void
2024{
2026 PGresult *res;
2027 char *subname;
2028
2029 Assert(conn != NULL);
2030
2031 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2032
2033 if (dry_run)
2034 pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2035 dbinfo->subname, dbinfo->dbname);
2036 else
2037 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2038 dbinfo->subname, dbinfo->dbname);
2039
2040 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2041
2042 pg_log_debug("command is: %s", str->data);
2043
2044 if (!dry_run)
2045 {
2046 res = PQexec(conn, str->data);
2047 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2048 {
2049 pg_log_error("could not enable subscription \"%s\": %s",
2050 dbinfo->subname, PQresultErrorMessage(res));
2052 }
2053
2054 PQclear(res);
2055 }
2056
2059}
2060
2061/*
2062 * Fetch a list of all connectable non-template databases from the source server
2063 * and form a list such that they appear as if the user has specified multiple
2064 * --database options, one for each source database.
2065 */
2066static void
2068 bool dbnamespecified)
2069{
2070 PGconn *conn;
2071 PGresult *res;
2072
2073 /* If a database name was specified, just connect to it. */
2074 if (dbnamespecified)
2076 else
2077 {
2078 /* Otherwise, try postgres first and then template1. */
2079 char *conninfo;
2080
2081 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2082 conn = connect_database(conninfo, false);
2083 pg_free(conninfo);
2084 if (!conn)
2085 {
2086 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2087 conn = connect_database(conninfo, true);
2088 pg_free(conninfo);
2089 }
2090 }
2091
2092 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2093 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2094 {
2095 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2096 PQclear(res);
2098 }
2099
2100 for (int i = 0; i < PQntuples(res); i++)
2101 {
2102 const char *dbname = PQgetvalue(res, i, 0);
2103
2105
2106 /* Increment num_dbs to reflect multiple --database options */
2107 num_dbs++;
2108 }
2109
2110 PQclear(res);
2111 disconnect_database(conn, false);
2112}
2113
2114int
2115main(int argc, char **argv)
2116{
2117 static struct option long_options[] =
2118 {
2119 {"all", no_argument, NULL, 'a'},
2120 {"database", required_argument, NULL, 'd'},
2121 {"pgdata", required_argument, NULL, 'D'},
2122 {"dry-run", no_argument, NULL, 'n'},
2123 {"subscriber-port", required_argument, NULL, 'p'},
2124 {"publisher-server", required_argument, NULL, 'P'},
2125 {"socketdir", required_argument, NULL, 's'},
2126 {"recovery-timeout", required_argument, NULL, 't'},
2127 {"enable-two-phase", no_argument, NULL, 'T'},
2128 {"subscriber-username", required_argument, NULL, 'U'},
2129 {"verbose", no_argument, NULL, 'v'},
2130 {"version", no_argument, NULL, 'V'},
2131 {"help", no_argument, NULL, '?'},
2132 {"config-file", required_argument, NULL, 1},
2133 {"publication", required_argument, NULL, 2},
2134 {"replication-slot", required_argument, NULL, 3},
2135 {"subscription", required_argument, NULL, 4},
2136 {"clean", required_argument, NULL, 5},
2137 {NULL, 0, NULL, 0}
2138 };
2139
2140 struct CreateSubscriberOptions opt = {0};
2141
2142 int c;
2143 int option_index;
2144
2145 char *pub_base_conninfo;
2146 char *sub_base_conninfo;
2147 char *dbname_conninfo = NULL;
2148
2149 uint64 pub_sysid;
2150 uint64 sub_sysid;
2151 struct stat statbuf;
2152
2153 char *consistent_lsn;
2154
2155 char pidfile[MAXPGPATH];
2156
2157 pg_logging_init(argv[0]);
2159 progname = get_progname(argv[0]);
2160 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2161
2162 if (argc > 1)
2163 {
2164 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2165 {
2166 usage();
2167 exit(0);
2168 }
2169 else if (strcmp(argv[1], "-V") == 0
2170 || strcmp(argv[1], "--version") == 0)
2171 {
2172 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2173 exit(0);
2174 }
2175 }
2176
2177 /* Default settings */
2178 subscriber_dir = NULL;
2179 opt.config_file = NULL;
2180 opt.pub_conninfo_str = NULL;
2181 opt.socket_dir = NULL;
2183 opt.sub_username = NULL;
2184 opt.two_phase = false;
2186 {
2187 0
2188 };
2189 opt.recovery_timeout = 0;
2190 opt.all_dbs = false;
2191
2192 /*
2193 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2194 * it either.
2195 */
2196#ifndef WIN32
2197 if (geteuid() == 0)
2198 {
2199 pg_log_error("cannot be executed by \"root\"");
2200 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2201 progname);
2202 exit(1);
2203 }
2204#endif
2205
2207
2208 while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2209 long_options, &option_index)) != -1)
2210 {
2211 switch (c)
2212 {
2213 case 'a':
2214 opt.all_dbs = true;
2215 break;
2216 case 'd':
2218 {
2220 num_dbs++;
2221 }
2222 else
2223 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2224 break;
2225 case 'D':
2228 break;
2229 case 'n':
2230 dry_run = true;
2231 break;
2232 case 'p':
2233 opt.sub_port = pg_strdup(optarg);
2234 break;
2235 case 'P':
2237 break;
2238 case 's':
2241 break;
2242 case 't':
2243 opt.recovery_timeout = atoi(optarg);
2244 break;
2245 case 'T':
2246 opt.two_phase = true;
2247 break;
2248 case 'U':
2250 break;
2251 case 'v':
2253 break;
2254 case 1:
2256 break;
2257 case 2:
2259 {
2261 num_pubs++;
2262 }
2263 else
2264 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2265 break;
2266 case 3:
2268 {
2270 num_replslots++;
2271 }
2272 else
2273 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2274 break;
2275 case 4:
2277 {
2279 num_subs++;
2280 }
2281 else
2282 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2283 break;
2284 case 5:
2287 else
2288 pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2289 break;
2290 default:
2291 /* getopt_long already emitted a complaint */
2292 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2293 exit(1);
2294 }
2295 }
2296
2297 /* Validate that --all is not used with incompatible options */
2298 if (opt.all_dbs)
2299 {
2300 char *bad_switch = NULL;
2301
2302 if (num_dbs > 0)
2303 bad_switch = "--database";
2304 else if (num_pubs > 0)
2305 bad_switch = "--publication";
2306 else if (num_replslots > 0)
2307 bad_switch = "--replication-slot";
2308 else if (num_subs > 0)
2309 bad_switch = "--subscription";
2310
2311 if (bad_switch)
2312 {
2313 pg_log_error("options %s and %s cannot be used together",
2314 bad_switch, "-a/--all");
2315 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2316 exit(1);
2317 }
2318 }
2319
2320 /* Any non-option arguments? */
2321 if (optind < argc)
2322 {
2323 pg_log_error("too many command-line arguments (first is \"%s\")",
2324 argv[optind]);
2325 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2326 exit(1);
2327 }
2328
2329 /* Required arguments */
2330 if (subscriber_dir == NULL)
2331 {
2332 pg_log_error("no subscriber data directory specified");
2333 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2334 exit(1);
2335 }
2336
2337 /* If socket directory is not provided, use the current directory */
2338 if (opt.socket_dir == NULL)
2339 {
2340 char cwd[MAXPGPATH];
2341
2342 if (!getcwd(cwd, MAXPGPATH))
2343 pg_fatal("could not determine current directory");
2344 opt.socket_dir = pg_strdup(cwd);
2346 }
2347
2348 /*
2349 * Parse connection string. Build a base connection string that might be
2350 * reused by multiple databases.
2351 */
2352 if (opt.pub_conninfo_str == NULL)
2353 {
2354 /*
2355 * TODO use primary_conninfo (if available) from subscriber and
2356 * extract publisher connection string. Assume that there are
2357 * identical entries for physical and logical replication. If there is
2358 * not, we would fail anyway.
2359 */
2360 pg_log_error("no publisher connection string specified");
2361 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2362 exit(1);
2363 }
2364
2365 if (dry_run)
2366 pg_log_info("Executing in dry-run mode.\n"
2367 "The target directory will not be modified.");
2368
2369 pg_log_info("validating publisher connection string");
2370 pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2371 &dbname_conninfo);
2372 if (pub_base_conninfo == NULL)
2373 exit(1);
2374
2375 pg_log_info("validating subscriber connection string");
2376 sub_base_conninfo = get_sub_conninfo(&opt);
2377
2378 /*
2379 * Fetch all databases from the source (publisher) and treat them as if
2380 * the user specified has multiple --database options, one for each source
2381 * database.
2382 */
2383 if (opt.all_dbs)
2384 {
2385 bool dbnamespecified = (dbname_conninfo != NULL);
2386
2387 get_publisher_databases(&opt, dbnamespecified);
2388 }
2389
2390 if (opt.database_names.head == NULL)
2391 {
2392 pg_log_info("no database was specified");
2393
2394 /*
2395 * Try to obtain the dbname from the publisher conninfo. If dbname
2396 * parameter is not available, error out.
2397 */
2398 if (dbname_conninfo)
2399 {
2400 simple_string_list_append(&opt.database_names, dbname_conninfo);
2401 num_dbs++;
2402
2403 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2404 dbname_conninfo);
2405 }
2406 else
2407 {
2408 pg_log_error("no database name specified");
2409 pg_log_error_hint("Try \"%s --help\" for more information.",
2410 progname);
2411 exit(1);
2412 }
2413 }
2414
2415 /* Number of object names must match number of databases */
2416 if (num_pubs > 0 && num_pubs != num_dbs)
2417 {
2418 pg_log_error("wrong number of publication names specified");
2419 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2420 num_pubs, num_dbs);
2421 exit(1);
2422 }
2423 if (num_subs > 0 && num_subs != num_dbs)
2424 {
2425 pg_log_error("wrong number of subscription names specified");
2426 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2427 num_subs, num_dbs);
2428 exit(1);
2429 }
2430 if (num_replslots > 0 && num_replslots != num_dbs)
2431 {
2432 pg_log_error("wrong number of replication slot names specified");
2433 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2435 exit(1);
2436 }
2437
2438 /* Verify the object types specified for removal from the subscriber */
2439 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2440 {
2441 if (pg_strcasecmp(cell->val, "publications") == 0)
2443 else
2444 {
2445 pg_log_error("invalid object type \"%s\" specified for %s",
2446 cell->val, "--clean");
2447 pg_log_error_hint("The valid value is: \"%s\"", "publications");
2448 exit(1);
2449 }
2450 }
2451
2452 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2453 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2454 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2455
2456 /* Rudimentary check for a data directory */
2458
2460
2461 /*
2462 * Store database information for publisher and subscriber. It should be
2463 * called before atexit() because its return is used in the
2464 * cleanup_objects_atexit().
2465 */
2466 dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2467
2468 /* Register a function to clean up objects in case of failure */
2469 atexit(cleanup_objects_atexit);
2470
2471 /*
2472 * Check if the subscriber data directory has the same system identifier
2473 * than the publisher data directory.
2474 */
2476 sub_sysid = get_standby_sysid(subscriber_dir);
2477 if (pub_sysid != sub_sysid)
2478 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2479
2480 /* Subscriber PID file */
2481 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2482
2483 /*
2484 * The standby server must not be running. If the server is started under
2485 * service manager and pg_createsubscriber stops it, the service manager
2486 * might react to this action and start the server again. Therefore,
2487 * refuse to proceed if the server is running to avoid possible failures.
2488 */
2489 if (stat(pidfile, &statbuf) == 0)
2490 {
2491 pg_log_error("standby server is running");
2492 pg_log_error_hint("Stop the standby server and try again.");
2493 exit(1);
2494 }
2495
2496 /*
2497 * Start a short-lived standby server with temporary parameters (provided
2498 * by command-line options). The goal is to avoid connections during the
2499 * transformation steps.
2500 */
2501 pg_log_info("starting the standby server with command-line options");
2502 start_standby_server(&opt, true, false);
2503
2504 /* Check if the standby server is ready for logical replication */
2506
2507 /* Check if the primary server is ready for logical replication */
2509
2510 /*
2511 * Stop the target server. The recovery process requires that the server
2512 * reaches a consistent state before targeting the recovery stop point.
2513 * Make sure a consistent state is reached (stop the target server
2514 * guarantees it) *before* creating the replication slots in
2515 * setup_publisher().
2516 */
2517 pg_log_info("stopping the subscriber");
2519
2520 /* Create the required objects for each database on publisher */
2521 consistent_lsn = setup_publisher(dbinfos.dbinfo);
2522
2523 /* Write the required recovery parameters */
2524 setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2525
2526 /*
2527 * Start subscriber so the recovery parameters will take effect. Wait
2528 * until accepting connections. We don't want to start logical replication
2529 * during setup.
2530 */
2531 pg_log_info("starting the subscriber");
2532 start_standby_server(&opt, true, true);
2533
2534 /* Waiting the subscriber to be promoted */
2536
2537 /*
2538 * Create the subscription for each database on subscriber. It does not
2539 * enable it immediately because it needs to adjust the replication start
2540 * point to the LSN reported by setup_publisher(). It also cleans up
2541 * publications created by this tool and replication to the standby.
2542 */
2543 setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2544
2545 /* Remove primary_slot_name if it exists on primary */
2547
2548 /* Remove failover replication slots if they exist on subscriber */
2550
2551 /* Stop the subscriber */
2552 pg_log_info("stopping the subscriber");
2554
2555 /* Change system identifier from subscriber */
2557
2558 success = true;
2559
2560 pg_log_info("Done!");
2561
2562 return 0;
2563}
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1199
uint32 bits32
Definition: c.h:561
uint64_t uint64
Definition: c.h:553
uint32_t uint32
Definition: c.h:552
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:161
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:430
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:311
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
#define USECS_PER_SEC
Definition: timestamp.h:134
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:825
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7525
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6241
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7641
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5316
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
void PQfreemem(void *ptr)
Definition: fe-exec.c:4049
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4399
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2279
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4405
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:56
uint32 get_pg_version(const char *datadir, char **version_str)
Definition: version.c:44
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
Assert(PointerIsAligned(start, uint64))
const char * str
long val
Definition: informix.c:689
int i
Definition: isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultStatus
Definition: libpq-be-fe.h:247
#define PQntuples
Definition: libpq-be-fe.h:251
@ CONNECTION_OK
Definition: libpq-fe.h:84
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:176
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_info_hint(...)
Definition: logging.h:130
@ PG_LOG_WARNING
Definition: logging.h:38
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_log_debug(...)
Definition: logging.h:133
#define pg_fatal(...)
static PQExpBuffer recoveryconfcontents
#define MAXPGPATH
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
#define WAIT_INTERVAL
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
int main(int argc, char **argv)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static struct LogicalRepInfos dbinfos
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool success
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static bool dry_run
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
#define OBJECTTYPE_PUBLICATIONS
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void usage(void)
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
static char * argv0
Definition: pg_ctl.c:94
static char * exec_path
Definition: pg_ctl.c:89
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89
char * datadir
NameData subname
static char buf[DEFAULT_XLOG_SEG_SIZE]
Definition: pg_test_fsync.c:71
#define pg_log_warning(...)
Definition: pgfnames.c:24
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:32
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
void canonicalize_path(char *path)
Definition: path.c:337
#define snprintf
Definition: port.h:260
#define DEVNULL
Definition: port.h:161
const char * get_progname(const char *argv0)
Definition: path.c:652
#define printf(...)
Definition: port.h:266
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:125
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:28
void get_restricted_token(void)
void pg_usleep(long microsec)
Definition: signal.c:53
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
struct SimpleStringList SimpleStringList
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:582
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:698
uint64 system_identifier
Definition: pg_control.h:112
SimpleStringList database_names
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
struct LogicalRepInfo * dbinfo
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42
#define GET_PG_MAJORVERSION_NUM(v)
Definition: version.h:19
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
#define stat
Definition: win32_port.h:274
#define WIFEXITED(w)
Definition: win32_port.h:150
#define WIFSIGNALED(w)
Definition: win32_port.h:151
#define WTERMSIG(w)
Definition: win32_port.h:153
#define WEXITSTATUS(w)
Definition: win32_port.h:152
int gettimeofday(struct timeval *tp, void *tzp)
int wal_level
Definition: xlog.c:134
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28