PostgreSQL Source Code git master
pg_createsubscriber.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * pg_createsubscriber.c
4 * Create a new logical replica from a standby server
5 *
6 * Copyright (c) 2024-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/bin/pg_basebackup/pg_createsubscriber.c
10 *
11 *-------------------------------------------------------------------------
12 */
13
14#include "postgres_fe.h"
15
16#include <sys/stat.h>
17#include <sys/time.h>
18#include <sys/wait.h>
19#include <time.h>
20
21#include "common/connect.h"
23#include "common/logging.h"
24#include "common/pg_prng.h"
29#include "getopt_long.h"
30
31#define DEFAULT_SUB_PORT "50432"
32
33/* Command-line options */
35{
36 char *config_file; /* configuration file */
37 char *pub_conninfo_str; /* publisher connection string */
38 char *socket_dir; /* directory for Unix-domain socket, if any */
39 char *sub_port; /* subscriber port number */
40 const char *sub_username; /* subscriber username */
41 bool two_phase; /* enable-two-phase option */
42 SimpleStringList database_names; /* list of database names */
43 SimpleStringList pub_names; /* list of publication names */
44 SimpleStringList sub_names; /* list of subscription names */
45 SimpleStringList replslot_names; /* list of replication slot names */
46 int recovery_timeout; /* stop recovery after this time */
47};
48
49/* per-database publication/subscription info */
51{
52 char *dbname; /* database name */
53 char *pubconninfo; /* publisher connection string */
54 char *subconninfo; /* subscriber connection string */
55 char *pubname; /* publication name */
56 char *subname; /* subscription name */
57 char *replslotname; /* replication slot name */
58
59 bool made_replslot; /* replication slot was created */
60 bool made_publication; /* publication was created */
61};
62
63/*
64 * Information shared across all the databases (or publications and
65 * subscriptions).
66 */
68{
70 bool two_phase; /* enable-two-phase option */
71};
72
73static void cleanup_objects_atexit(void);
74static void usage();
75static char *get_base_conninfo(const char *conninfo, char **dbname);
76static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
77static char *get_exec_path(const char *argv0, const char *progname);
78static void check_data_directory(const char *datadir);
79static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
80static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
81 const char *pub_base_conninfo,
82 const char *sub_base_conninfo);
83static PGconn *connect_database(const char *conninfo, bool exit_on_error);
84static void disconnect_database(PGconn *conn, bool exit_on_error);
85static uint64 get_primary_sysid(const char *conninfo);
86static uint64 get_standby_sysid(const char *datadir);
87static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
89static char *generate_object_name(PGconn *conn);
90static void check_publisher(const struct LogicalRepInfo *dbinfo);
91static char *setup_publisher(struct LogicalRepInfo *dbinfo);
92static void check_subscriber(const struct LogicalRepInfo *dbinfo);
93static void setup_subscriber(struct LogicalRepInfo *dbinfo,
94 const char *consistent_lsn);
95static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
96 const char *lsn);
97static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
98 const char *slotname);
99static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo);
101 struct LogicalRepInfo *dbinfo);
102static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
103 const char *slot_name);
104static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
105static void start_standby_server(const struct CreateSubscriberOptions *opt,
106 bool restricted_access,
107 bool restrict_logical_worker);
108static void stop_standby_server(const char *datadir);
109static void wait_for_end_recovery(const char *conninfo,
110 const struct CreateSubscriberOptions *opt);
111static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
112static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
113static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
114static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
115 const char *lsn);
116static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
118 const struct LogicalRepInfo *dbinfo);
119static void drop_existing_subscriptions(PGconn *conn, const char *subname,
120 const char *dbname);
121
122#define USEC_PER_SEC 1000000
123#define WAIT_INTERVAL 1 /* 1 second */
124
125static const char *progname;
126
127static char *primary_slot_name = NULL;
128static bool dry_run = false;
129
130static bool success = false;
131
133static int num_dbs = 0; /* number of specified databases */
134static int num_pubs = 0; /* number of specified publications */
135static int num_subs = 0; /* number of specified subscriptions */
136static int num_replslots = 0; /* number of specified replication slots */
137
139
140static char *pg_ctl_path = NULL;
141static char *pg_resetwal_path = NULL;
142
143/* standby / subscriber data directory */
144static char *subscriber_dir = NULL;
145
146static bool recovery_ended = false;
147static bool standby_running = false;
148
150{
154
155
156/*
157 * Cleanup objects that were created by pg_createsubscriber if there is an
158 * error.
159 *
160 * Publications and replication slots are created on primary. Depending on the
161 * step it failed, it should remove the already created objects if it is
162 * possible (sometimes it won't work due to a connection issue).
163 * There is no cleanup on the target server. The steps on the target server are
164 * executed *after* promotion, hence, at this point, a failure means recreate
165 * the physical replica and start again.
166 */
167static void
169{
170 if (success)
171 return;
172
173 /*
174 * If the server is promoted, there is no way to use the current setup
175 * again. Warn the user that a new replication setup should be done before
176 * trying again.
177 */
178 if (recovery_ended)
179 {
180 pg_log_warning("failed after the end of recovery");
181 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
182 "You must recreate the physical replica before continuing.");
183 }
184
185 for (int i = 0; i < num_dbs; i++)
186 {
187 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
188
189 if (dbinfo->made_publication || dbinfo->made_replslot)
190 {
191 PGconn *conn;
192
193 conn = connect_database(dbinfo->pubconninfo, false);
194 if (conn != NULL)
195 {
196 if (dbinfo->made_publication)
197 drop_publication(conn, dbinfo);
198 if (dbinfo->made_replslot)
199 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
201 }
202 else
203 {
204 /*
205 * If a connection could not be established, inform the user
206 * that some objects were left on primary and should be
207 * removed before trying again.
208 */
209 if (dbinfo->made_publication)
210 {
211 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
212 dbinfo->pubname,
213 dbinfo->dbname);
214 pg_log_warning_hint("Drop this publication before trying again.");
215 }
216 if (dbinfo->made_replslot)
217 {
218 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
219 dbinfo->replslotname,
220 dbinfo->dbname);
221 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
222 }
223 }
224 }
225 }
226
227 if (standby_running)
229}
230
231static void
232usage(void)
233{
234 printf(_("%s creates a new logical replica from a standby server.\n\n"),
235 progname);
236 printf(_("Usage:\n"));
237 printf(_(" %s [OPTION]...\n"), progname);
238 printf(_("\nOptions:\n"));
239 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
240 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
241 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
242 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
243 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
244 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
245 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
246 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
247 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
248 printf(_(" -v, --verbose output verbose messages\n"));
249 printf(_(" --config-file=FILENAME use specified main server configuration\n"
250 " file when running target cluster\n"));
251 printf(_(" --publication=NAME publication name\n"));
252 printf(_(" --replication-slot=NAME replication slot name\n"));
253 printf(_(" --subscription=NAME subscription name\n"));
254 printf(_(" -V, --version output version information, then exit\n"));
255 printf(_(" -?, --help show this help, then exit\n"));
256 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
257 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
258}
259
260/*
261 * Subroutine to append "keyword=value" to a connection string,
262 * with proper quoting of the value. (We assume keywords don't need that.)
263 */
264static void
265appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
266{
267 if (buf->len > 0)
269 appendPQExpBufferStr(buf, keyword);
272}
273
274/*
275 * Validate a connection string. Returns a base connection string that is a
276 * connection string without a database name.
277 *
278 * Since we might process multiple databases, each database name will be
279 * appended to this base connection string to provide a final connection
280 * string. If the second argument (dbname) is not null, returns dbname if the
281 * provided connection string contains it.
282 *
283 * It is the caller's responsibility to free the returned connection string and
284 * dbname.
285 */
286static char *
287get_base_conninfo(const char *conninfo, char **dbname)
288{
290 PQconninfoOption *conn_opts;
291 PQconninfoOption *conn_opt;
292 char *errmsg = NULL;
293 char *ret;
294
295 conn_opts = PQconninfoParse(conninfo, &errmsg);
296 if (conn_opts == NULL)
297 {
298 pg_log_error("could not parse connection string: %s", errmsg);
300 return NULL;
301 }
302
304 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
305 {
306 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
307 {
308 if (strcmp(conn_opt->keyword, "dbname") == 0)
309 {
310 if (dbname)
311 *dbname = pg_strdup(conn_opt->val);
312 continue;
313 }
314 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
315 }
316 }
317
318 ret = pg_strdup(buf->data);
319
321 PQconninfoFree(conn_opts);
322
323 return ret;
324}
325
326/*
327 * Build a subscriber connection string. Only a few parameters are supported
328 * since it starts a server with restricted access.
329 */
330static char *
332{
334 char *ret;
335
336 appendConnStrItem(buf, "port", opt->sub_port);
337#if !defined(WIN32)
338 appendConnStrItem(buf, "host", opt->socket_dir);
339#endif
340 if (opt->sub_username != NULL)
341 appendConnStrItem(buf, "user", opt->sub_username);
342 appendConnStrItem(buf, "fallback_application_name", progname);
343
344 ret = pg_strdup(buf->data);
345
347
348 return ret;
349}
350
351/*
352 * Verify if a PostgreSQL binary (progname) is available in the same directory as
353 * pg_createsubscriber and it has the same version. It returns the absolute
354 * path of the progname.
355 */
356static char *
357get_exec_path(const char *argv0, const char *progname)
358{
359 char *versionstr;
360 char *exec_path;
361 int ret;
362
363 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
365 ret = find_other_exec(argv0, progname, versionstr, exec_path);
366
367 if (ret < 0)
368 {
369 char full_path[MAXPGPATH];
370
371 if (find_my_exec(argv0, full_path) < 0)
372 strlcpy(full_path, progname, sizeof(full_path));
373
374 if (ret == -1)
375 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
376 progname, "pg_createsubscriber", full_path);
377 else
378 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
379 progname, full_path, "pg_createsubscriber");
380 }
381
382 pg_log_debug("%s path is: %s", progname, exec_path);
383
384 return exec_path;
385}
386
387/*
388 * Is it a cluster directory? These are preliminary checks. It is far from
389 * making an accurate check. If it is not a clone from the publisher, it will
390 * eventually fail in a future step.
391 */
392static void
394{
395 struct stat statbuf;
396 char versionfile[MAXPGPATH];
397
398 pg_log_info("checking if directory \"%s\" is a cluster data directory",
399 datadir);
400
401 if (stat(datadir, &statbuf) != 0)
402 {
403 if (errno == ENOENT)
404 pg_fatal("data directory \"%s\" does not exist", datadir);
405 else
406 pg_fatal("could not access directory \"%s\": %m", datadir);
407 }
408
409 snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
410 if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
411 {
412 pg_fatal("directory \"%s\" is not a database cluster directory",
413 datadir);
414 }
415}
416
417/*
418 * Append database name into a base connection string.
419 *
420 * dbname is the only parameter that changes so it is not included in the base
421 * connection string. This function concatenates dbname to build a "real"
422 * connection string.
423 */
424static char *
425concat_conninfo_dbname(const char *conninfo, const char *dbname)
426{
428 char *ret;
429
430 Assert(conninfo != NULL);
431
432 appendPQExpBufferStr(buf, conninfo);
433 appendConnStrItem(buf, "dbname", dbname);
434
435 ret = pg_strdup(buf->data);
437
438 return ret;
439}
440
441/*
442 * Store publication and subscription information.
443 *
444 * If publication, replication slot and subscription names were specified,
445 * store it here. Otherwise, a generated name will be assigned to the object in
446 * setup_publisher().
447 */
448static struct LogicalRepInfo *
450 const char *pub_base_conninfo,
451 const char *sub_base_conninfo)
452{
453 struct LogicalRepInfo *dbinfo;
454 SimpleStringListCell *pubcell = NULL;
455 SimpleStringListCell *subcell = NULL;
456 SimpleStringListCell *replslotcell = NULL;
457 int i = 0;
458
459 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
460
461 if (num_pubs > 0)
462 pubcell = opt->pub_names.head;
463 if (num_subs > 0)
464 subcell = opt->sub_names.head;
465 if (num_replslots > 0)
466 replslotcell = opt->replslot_names.head;
467
468 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
469 {
470 char *conninfo;
471
472 /* Fill publisher attributes */
473 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
474 dbinfo[i].pubconninfo = conninfo;
475 dbinfo[i].dbname = cell->val;
476 if (num_pubs > 0)
477 dbinfo[i].pubname = pubcell->val;
478 else
479 dbinfo[i].pubname = NULL;
480 if (num_replslots > 0)
481 dbinfo[i].replslotname = replslotcell->val;
482 else
483 dbinfo[i].replslotname = NULL;
484 dbinfo[i].made_replslot = false;
485 dbinfo[i].made_publication = false;
486 /* Fill subscriber attributes */
487 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
488 dbinfo[i].subconninfo = conninfo;
489 if (num_subs > 0)
490 dbinfo[i].subname = subcell->val;
491 else
492 dbinfo[i].subname = NULL;
493 /* Other fields will be filled later */
494
495 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
496 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
497 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
498 dbinfo[i].pubconninfo);
499 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
500 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
501 dbinfo[i].subconninfo,
502 dbinfos.two_phase ? "true" : "false");
503
504 if (num_pubs > 0)
505 pubcell = pubcell->next;
506 if (num_subs > 0)
507 subcell = subcell->next;
508 if (num_replslots > 0)
509 replslotcell = replslotcell->next;
510
511 i++;
512 }
513
514 return dbinfo;
515}
516
517/*
518 * Open a new connection. If exit_on_error is true, it has an undesired
519 * condition and it should exit immediately.
520 */
521static PGconn *
522connect_database(const char *conninfo, bool exit_on_error)
523{
524 PGconn *conn;
525 PGresult *res;
526
527 conn = PQconnectdb(conninfo);
529 {
530 pg_log_error("connection to database failed: %s",
532 PQfinish(conn);
533
534 if (exit_on_error)
535 exit(1);
536 return NULL;
537 }
538
539 /* Secure search_path */
542 {
543 pg_log_error("could not clear \"search_path\": %s",
545 PQclear(res);
546 PQfinish(conn);
547
548 if (exit_on_error)
549 exit(1);
550 return NULL;
551 }
552 PQclear(res);
553
554 return conn;
555}
556
557/*
558 * Close the connection. If exit_on_error is true, it has an undesired
559 * condition and it should exit immediately.
560 */
561static void
562disconnect_database(PGconn *conn, bool exit_on_error)
563{
564 Assert(conn != NULL);
565
566 PQfinish(conn);
567
568 if (exit_on_error)
569 exit(1);
570}
571
572/*
573 * Obtain the system identifier using the provided connection. It will be used
574 * to compare if a data directory is a clone of another one.
575 */
576static uint64
577get_primary_sysid(const char *conninfo)
578{
579 PGconn *conn;
580 PGresult *res;
581 uint64 sysid;
582
583 pg_log_info("getting system identifier from publisher");
584
585 conn = connect_database(conninfo, true);
586
587 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
589 {
590 pg_log_error("could not get system identifier: %s",
593 }
594 if (PQntuples(res) != 1)
595 {
596 pg_log_error("could not get system identifier: got %d rows, expected %d row",
597 PQntuples(res), 1);
599 }
600
601 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
602
603 pg_log_info("system identifier is %llu on publisher",
604 (unsigned long long) sysid);
605
606 PQclear(res);
608
609 return sysid;
610}
611
612/*
613 * Obtain the system identifier from control file. It will be used to compare
614 * if a data directory is a clone of another one. This routine is used locally
615 * and avoids a connection.
616 */
617static uint64
619{
620 ControlFileData *cf;
621 bool crc_ok;
622 uint64 sysid;
623
624 pg_log_info("getting system identifier from subscriber");
625
626 cf = get_controlfile(datadir, &crc_ok);
627 if (!crc_ok)
628 pg_fatal("control file appears to be corrupt");
629
630 sysid = cf->system_identifier;
631
632 pg_log_info("system identifier is %llu on subscriber",
633 (unsigned long long) sysid);
634
635 pg_free(cf);
636
637 return sysid;
638}
639
640/*
641 * Modify the system identifier. Since a standby server preserves the system
642 * identifier, it makes sense to change it to avoid situations in which WAL
643 * files from one of the systems might be used in the other one.
644 */
645static void
647{
648 ControlFileData *cf;
649 bool crc_ok;
650 struct timeval tv;
651
652 char *cmd_str;
653
654 pg_log_info("modifying system identifier of subscriber");
655
656 cf = get_controlfile(subscriber_dir, &crc_ok);
657 if (!crc_ok)
658 pg_fatal("control file appears to be corrupt");
659
660 /*
661 * Select a new system identifier.
662 *
663 * XXX this code was extracted from BootStrapXLOG().
664 */
665 gettimeofday(&tv, NULL);
666 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
667 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
668 cf->system_identifier |= getpid() & 0xFFF;
669
670 if (!dry_run)
672
673 pg_log_info("system identifier is %llu on subscriber",
674 (unsigned long long) cf->system_identifier);
675
676 pg_log_info("running pg_resetwal on the subscriber");
677
678 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
680
681 pg_log_debug("pg_resetwal command is: %s", cmd_str);
682
683 if (!dry_run)
684 {
685 int rc = system(cmd_str);
686
687 if (rc == 0)
688 pg_log_info("subscriber successfully changed the system identifier");
689 else
690 pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
691 }
692
693 pg_free(cf);
694}
695
696/*
697 * Generate an object name using a prefix, database oid and a random integer.
698 * It is used in case the user does not specify an object name (publication,
699 * subscription, replication slot).
700 */
701static char *
703{
704 PGresult *res;
705 Oid oid;
706 uint32 rand;
707 char *objname;
708
709 res = PQexec(conn,
710 "SELECT oid FROM pg_catalog.pg_database "
711 "WHERE datname = pg_catalog.current_database()");
713 {
714 pg_log_error("could not obtain database OID: %s",
717 }
718
719 if (PQntuples(res) != 1)
720 {
721 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
722 PQntuples(res), 1);
724 }
725
726 /* Database OID */
727 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
728
729 PQclear(res);
730
731 /* Random unsigned integer */
732 rand = pg_prng_uint32(&prng_state);
733
734 /*
735 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
736 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
737 * '\0').
738 */
739 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
740
741 return objname;
742}
743
744/*
745 * Create the publications and replication slots in preparation for logical
746 * replication. Returns the LSN from latest replication slot. It will be the
747 * replication start point that is used to adjust the subscriptions (see
748 * set_replication_progress).
749 */
750static char *
752{
753 char *lsn = NULL;
754
755 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
756
757 for (int i = 0; i < num_dbs; i++)
758 {
759 PGconn *conn;
760 char *genname = NULL;
761
762 conn = connect_database(dbinfo[i].pubconninfo, true);
763
764 /*
765 * If an object name was not specified as command-line options, assign
766 * a generated object name. The replication slot has a different rule.
767 * The subscription name is assigned to the replication slot name if
768 * no replication slot is specified. It follows the same rule as
769 * CREATE SUBSCRIPTION.
770 */
771 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
772 genname = generate_object_name(conn);
773 if (num_pubs == 0)
774 dbinfo[i].pubname = pg_strdup(genname);
775 if (num_subs == 0)
776 dbinfo[i].subname = pg_strdup(genname);
777 if (num_replslots == 0)
778 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
779
780 /*
781 * Create publication on publisher. This step should be executed
782 * *before* promoting the subscriber to avoid any transactions between
783 * consistent LSN and the new publication rows (such transactions
784 * wouldn't see the new publication rows resulting in an error).
785 */
786 create_publication(conn, &dbinfo[i]);
787
788 /* Create replication slot on publisher */
789 if (lsn)
790 pg_free(lsn);
791 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
792 if (lsn != NULL || dry_run)
793 pg_log_info("create replication slot \"%s\" on publisher",
794 dbinfo[i].replslotname);
795 else
796 exit(1);
797
798 /*
799 * Since we are using the LSN returned by the last replication slot as
800 * recovery_target_lsn, this LSN is ahead of the current WAL position
801 * and the recovery waits until the publisher writes a WAL record to
802 * reach the target and ends the recovery. On idle systems, this wait
803 * time is unpredictable and could lead to failure in promoting the
804 * subscriber. To avoid that, insert a harmless WAL record.
805 */
806 if (i == num_dbs - 1 && !dry_run)
807 {
808 PGresult *res;
809
810 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
812 {
813 pg_log_error("could not write an additional WAL record: %s",
816 }
817 PQclear(res);
818 }
819
821 }
822
823 return lsn;
824}
825
826/*
827 * Is recovery still in progress?
828 */
829static bool
831{
832 PGresult *res;
833 int ret;
834
835 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
836
838 {
839 pg_log_error("could not obtain recovery progress: %s",
842 }
843
844
845 ret = strcmp("t", PQgetvalue(res, 0, 0));
846
847 PQclear(res);
848
849 return ret == 0;
850}
851
852/*
853 * Is the primary server ready for logical replication?
854 *
855 * XXX Does it not allow a synchronous replica?
856 */
857static void
858check_publisher(const struct LogicalRepInfo *dbinfo)
859{
860 PGconn *conn;
861 PGresult *res;
862 bool failed = false;
863
864 char *wal_level;
865 int max_repslots;
866 int cur_repslots;
867 int max_walsenders;
868 int cur_walsenders;
869 int max_prepared_transactions;
870 char *max_slot_wal_keep_size;
871
872 pg_log_info("checking settings on publisher");
873
874 conn = connect_database(dbinfo[0].pubconninfo, true);
875
876 /*
877 * If the primary server is in recovery (i.e. cascading replication),
878 * objects (publication) cannot be created because it is read only.
879 */
881 {
882 pg_log_error("primary server cannot be in recovery");
884 }
885
886 /*------------------------------------------------------------------------
887 * Logical replication requires a few parameters to be set on publisher.
888 * Since these parameters are not a requirement for physical replication,
889 * we should check it to make sure it won't fail.
890 *
891 * - wal_level = logical
892 * - max_replication_slots >= current + number of dbs to be converted
893 * - max_wal_senders >= current + number of dbs to be converted
894 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
895 * -----------------------------------------------------------------------
896 */
897 res = PQexec(conn,
898 "SELECT pg_catalog.current_setting('wal_level'),"
899 " pg_catalog.current_setting('max_replication_slots'),"
900 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
901 " pg_catalog.current_setting('max_wal_senders'),"
902 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
903 " pg_catalog.current_setting('max_prepared_transactions'),"
904 " pg_catalog.current_setting('max_slot_wal_keep_size')");
905
907 {
908 pg_log_error("could not obtain publisher settings: %s",
911 }
912
913 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
914 max_repslots = atoi(PQgetvalue(res, 0, 1));
915 cur_repslots = atoi(PQgetvalue(res, 0, 2));
916 max_walsenders = atoi(PQgetvalue(res, 0, 3));
917 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
918 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
919 max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
920
921 PQclear(res);
922
923 pg_log_debug("publisher: wal_level: %s", wal_level);
924 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
925 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
926 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
927 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
928 pg_log_debug("publisher: max_prepared_transactions: %d",
929 max_prepared_transactions);
930 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
931 max_slot_wal_keep_size);
932
934
935 if (strcmp(wal_level, "logical") != 0)
936 {
937 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
938 failed = true;
939 }
940
941 if (max_repslots - cur_repslots < num_dbs)
942 {
943 pg_log_error("publisher requires %d replication slots, but only %d remain",
944 num_dbs, max_repslots - cur_repslots);
945 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
946 "max_replication_slots", cur_repslots + num_dbs);
947 failed = true;
948 }
949
950 if (max_walsenders - cur_walsenders < num_dbs)
951 {
952 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
953 num_dbs, max_walsenders - cur_walsenders);
954 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
955 "max_wal_senders", cur_walsenders + num_dbs);
956 failed = true;
957 }
958
959 if (max_prepared_transactions != 0 && !dbinfos.two_phase)
960 {
961 pg_log_warning("two_phase option will not be enabled for replication slots");
962 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
963 "Prepared transactions will be replicated at COMMIT PREPARED.");
964 pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
965 }
966
967 /*
968 * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
969 * non-default value, it may cause replication failures due to required
970 * WAL files being prematurely removed.
971 */
972 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
973 {
974 pg_log_warning("required WAL could be removed from the publisher");
975 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
976 "max_slot_wal_keep_size");
977 }
978
980
981 if (failed)
982 exit(1);
983}
984
985/*
986 * Is the standby server ready for logical replication?
987 *
988 * XXX Does it not allow a time-delayed replica?
989 *
990 * XXX In a cascaded replication scenario (P -> S -> C), if the target server
991 * is S, it cannot detect there is a replica (server C) because server S starts
992 * accepting only local connections and server C cannot connect to it. Hence,
993 * there is not a reliable way to provide a suitable error saying the server C
994 * will be broken at the end of this process (due to pg_resetwal).
995 */
996static void
997check_subscriber(const struct LogicalRepInfo *dbinfo)
998{
999 PGconn *conn;
1000 PGresult *res;
1001 bool failed = false;
1002
1003 int max_lrworkers;
1004 int max_repslots;
1005 int max_wprocs;
1006
1007 pg_log_info("checking settings on subscriber");
1008
1009 conn = connect_database(dbinfo[0].subconninfo, true);
1010
1011 /* The target server must be a standby */
1013 {
1014 pg_log_error("target server must be a standby");
1016 }
1017
1018 /*------------------------------------------------------------------------
1019 * Logical replication requires a few parameters to be set on subscriber.
1020 * Since these parameters are not a requirement for physical replication,
1021 * we should check it to make sure it won't fail.
1022 *
1023 * - max_replication_slots >= number of dbs to be converted
1024 * - max_logical_replication_workers >= number of dbs to be converted
1025 * - max_worker_processes >= 1 + number of dbs to be converted
1026 *------------------------------------------------------------------------
1027 */
1028 res = PQexec(conn,
1029 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1030 "'max_logical_replication_workers', "
1031 "'max_replication_slots', "
1032 "'max_worker_processes', "
1033 "'primary_slot_name') "
1034 "ORDER BY name");
1035
1036 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1037 {
1038 pg_log_error("could not obtain subscriber settings: %s",
1041 }
1042
1043 max_lrworkers = atoi(PQgetvalue(res, 0, 0));
1044 max_repslots = atoi(PQgetvalue(res, 1, 0));
1045 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1046 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1048
1049 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1050 max_lrworkers);
1051 pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
1052 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1054 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1055
1056 PQclear(res);
1057
1058 disconnect_database(conn, false);
1059
1060 if (max_repslots < num_dbs)
1061 {
1062 pg_log_error("subscriber requires %d replication slots, but only %d remain",
1063 num_dbs, max_repslots);
1064 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1065 "max_replication_slots", num_dbs);
1066 failed = true;
1067 }
1068
1069 if (max_lrworkers < num_dbs)
1070 {
1071 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1072 num_dbs, max_lrworkers);
1073 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1074 "max_logical_replication_workers", num_dbs);
1075 failed = true;
1076 }
1077
1078 if (max_wprocs < num_dbs + 1)
1079 {
1080 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1081 num_dbs + 1, max_wprocs);
1082 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1083 "max_worker_processes", num_dbs + 1);
1084 failed = true;
1085 }
1086
1087 if (failed)
1088 exit(1);
1089}
1090
1091/*
1092 * Drop a specified subscription. This is to avoid duplicate subscriptions on
1093 * the primary (publisher node) and the newly created subscriber. We
1094 * shouldn't drop the associated slot as that would be used by the publisher
1095 * node.
1096 */
1097static void
1099{
1101 PGresult *res;
1102
1103 Assert(conn != NULL);
1104
1105 /*
1106 * Construct a query string. These commands are allowed to be executed
1107 * within a transaction.
1108 */
1109 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1110 subname);
1111 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1112 subname);
1113 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1114
1115 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1116 subname, dbname);
1117
1118 if (!dry_run)
1119 {
1120 res = PQexec(conn, query->data);
1121
1122 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1123 {
1124 pg_log_error("could not drop subscription \"%s\": %s",
1127 }
1128
1129 PQclear(res);
1130 }
1131
1132 destroyPQExpBuffer(query);
1133}
1134
1135/*
1136 * Retrieve and drop the pre-existing subscriptions.
1137 */
1138static void
1140 const struct LogicalRepInfo *dbinfo)
1141{
1143 char *dbname;
1144 PGresult *res;
1145
1146 Assert(conn != NULL);
1147
1148 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1149
1150 appendPQExpBuffer(query,
1151 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1152 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1153 "WHERE d.datname = %s",
1154 dbname);
1155 res = PQexec(conn, query->data);
1156
1157 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1158 {
1159 pg_log_error("could not obtain pre-existing subscriptions: %s",
1162 }
1163
1164 for (int i = 0; i < PQntuples(res); i++)
1166 dbinfo->dbname);
1167
1168 PQclear(res);
1169 destroyPQExpBuffer(query);
1171}
1172
1173/*
1174 * Create the subscriptions, adjust the initial location for logical
1175 * replication and enable the subscriptions. That's the last step for logical
1176 * replication setup.
1177 */
1178static void
1179setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1180{
1181 for (int i = 0; i < num_dbs; i++)
1182 {
1183 PGconn *conn;
1184
1185 /* Connect to subscriber. */
1186 conn = connect_database(dbinfo[i].subconninfo, true);
1187
1188 /*
1189 * We don't need the pre-existing subscriptions on the newly formed
1190 * subscriber. They can connect to other publisher nodes and either
1191 * get some unwarranted data or can lead to ERRORs in connecting to
1192 * such nodes.
1193 */
1195
1196 /*
1197 * Since the publication was created before the consistent LSN, it is
1198 * available on the subscriber when the physical replica is promoted.
1199 * Remove publications from the subscriber because it has no use.
1200 */
1201 drop_publication(conn, &dbinfo[i]);
1202
1203 create_subscription(conn, &dbinfo[i]);
1204
1205 /* Set the replication progress to the correct LSN */
1206 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1207
1208 /* Enable subscription */
1209 enable_subscription(conn, &dbinfo[i]);
1210
1211 disconnect_database(conn, false);
1212 }
1213}
1214
1215/*
1216 * Write the required recovery parameters.
1217 */
1218static void
1219setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1220{
1221 PGconn *conn;
1223
1224 /*
1225 * Despite of the recovery parameters will be written to the subscriber,
1226 * use a publisher connection. The primary_conninfo is generated using the
1227 * connection settings.
1228 */
1229 conn = connect_database(dbinfo[0].pubconninfo, true);
1230
1231 /*
1232 * Write recovery parameters.
1233 *
1234 * The subscriber is not running yet. In dry run mode, the recovery
1235 * parameters *won't* be written. An invalid LSN is used for printing
1236 * purposes. Additional recovery parameters are added here. It avoids
1237 * unexpected behavior such as end of recovery as soon as a consistent
1238 * state is reached (recovery_target) and failure due to multiple recovery
1239 * targets (name, time, xid, LSN).
1240 */
1242 appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1244 "recovery_target_timeline = 'latest'\n");
1246 "recovery_target_inclusive = true\n");
1248 "recovery_target_action = promote\n");
1249 appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1250 appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1251 appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1252
1253 if (dry_run)
1254 {
1255 appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1257 "recovery_target_lsn = '%X/%X'\n",
1259 }
1260 else
1261 {
1262 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1263 lsn);
1265 }
1266 disconnect_database(conn, false);
1267
1268 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1269}
1270
1271/*
1272 * Drop physical replication slot on primary if the standby was using it. After
1273 * the transformation, it has no use.
1274 *
1275 * XXX we might not fail here. Instead, we provide a warning so the user
1276 * eventually drops this replication slot later.
1277 */
1278static void
1279drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
1280{
1281 PGconn *conn;
1282
1283 /* Replication slot does not exist, do nothing */
1284 if (!primary_slot_name)
1285 return;
1286
1287 conn = connect_database(dbinfo[0].pubconninfo, false);
1288 if (conn != NULL)
1289 {
1290 drop_replication_slot(conn, &dbinfo[0], slotname);
1291 disconnect_database(conn, false);
1292 }
1293 else
1294 {
1295 pg_log_warning("could not drop replication slot \"%s\" on primary",
1296 slotname);
1297 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1298 }
1299}
1300
1301/*
1302 * Drop failover replication slots on subscriber. After the transformation,
1303 * they have no use.
1304 *
1305 * XXX We do not fail here. Instead, we provide a warning so the user can drop
1306 * them later.
1307 */
1308static void
1310{
1311 PGconn *conn;
1312 PGresult *res;
1313
1314 conn = connect_database(dbinfo[0].subconninfo, false);
1315 if (conn != NULL)
1316 {
1317 /* Get failover replication slot names */
1318 res = PQexec(conn,
1319 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1320
1321 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1322 {
1323 /* Remove failover replication slots from subscriber */
1324 for (int i = 0; i < PQntuples(res); i++)
1325 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1326 }
1327 else
1328 {
1329 pg_log_warning("could not obtain failover replication slot information: %s",
1331 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1332 }
1333
1334 PQclear(res);
1335 disconnect_database(conn, false);
1336 }
1337 else
1338 {
1339 pg_log_warning("could not drop failover replication slot");
1340 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1341 }
1342}
1343
1344/*
1345 * Create a logical replication slot and returns a LSN.
1346 *
1347 * CreateReplicationSlot() is not used because it does not provide the one-row
1348 * result set that contains the LSN.
1349 */
1350static char *
1352{
1354 PGresult *res = NULL;
1355 const char *slot_name = dbinfo->replslotname;
1356 char *slot_name_esc;
1357 char *lsn = NULL;
1358
1359 Assert(conn != NULL);
1360
1361 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1362 slot_name, dbinfo->dbname);
1363
1364 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1365
1367 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1368 slot_name_esc,
1369 dbinfos.two_phase ? "true" : "false");
1370
1371 PQfreemem(slot_name_esc);
1372
1373 pg_log_debug("command is: %s", str->data);
1374
1375 if (!dry_run)
1376 {
1377 res = PQexec(conn, str->data);
1378 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1379 {
1380 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1381 slot_name, dbinfo->dbname,
1383 PQclear(res);
1385 return NULL;
1386 }
1387
1388 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1389 PQclear(res);
1390 }
1391
1392 /* For cleanup purposes */
1393 dbinfo->made_replslot = true;
1394
1396
1397 return lsn;
1398}
1399
1400static void
1402 const char *slot_name)
1403{
1405 char *slot_name_esc;
1406 PGresult *res;
1407
1408 Assert(conn != NULL);
1409
1410 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1411 slot_name, dbinfo->dbname);
1412
1413 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1414
1415 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1416
1417 PQfreemem(slot_name_esc);
1418
1419 pg_log_debug("command is: %s", str->data);
1420
1421 if (!dry_run)
1422 {
1423 res = PQexec(conn, str->data);
1424 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1425 {
1426 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1427 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1428 dbinfo->made_replslot = false; /* don't try again. */
1429 }
1430
1431 PQclear(res);
1432 }
1433
1435}
1436
1437/*
1438 * Reports a suitable message if pg_ctl fails.
1439 */
1440static void
1441pg_ctl_status(const char *pg_ctl_cmd, int rc)
1442{
1443 if (rc != 0)
1444 {
1445 if (WIFEXITED(rc))
1446 {
1447 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1448 }
1449 else if (WIFSIGNALED(rc))
1450 {
1451#if defined(WIN32)
1452 pg_log_error("pg_ctl was terminated by exception 0x%X",
1453 WTERMSIG(rc));
1454 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1455#else
1456 pg_log_error("pg_ctl was terminated by signal %d: %s",
1457 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1458#endif
1459 }
1460 else
1461 {
1462 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1463 }
1464
1465 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1466 exit(1);
1467 }
1468}
1469
1470static void
1471start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1472 bool restrict_logical_worker)
1473{
1474 PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1475 int rc;
1476
1477 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1478 appendShellString(pg_ctl_cmd, subscriber_dir);
1479 appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1480
1481 /* Prevent unintended slot invalidation */
1482 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1483
1484 if (restricted_access)
1485 {
1486 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1487#if !defined(WIN32)
1488
1489 /*
1490 * An empty listen_addresses list means the server does not listen on
1491 * any IP interfaces; only Unix-domain sockets can be used to connect
1492 * to the server. Prevent external connections to minimize the chance
1493 * of failure.
1494 */
1495 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1496 if (opt->socket_dir)
1497 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1498 opt->socket_dir);
1499 appendPQExpBufferChar(pg_ctl_cmd, '"');
1500#endif
1501 }
1502 if (opt->config_file != NULL)
1503 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1504 opt->config_file);
1505
1506 /* Suppress to start logical replication if requested */
1507 if (restrict_logical_worker)
1508 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1509
1510 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1511 rc = system(pg_ctl_cmd->data);
1512 pg_ctl_status(pg_ctl_cmd->data, rc);
1513 standby_running = true;
1514 destroyPQExpBuffer(pg_ctl_cmd);
1515 pg_log_info("server was started");
1516}
1517
1518static void
1520{
1521 char *pg_ctl_cmd;
1522 int rc;
1523
1524 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1525 datadir);
1526 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1527 rc = system(pg_ctl_cmd);
1528 pg_ctl_status(pg_ctl_cmd, rc);
1529 standby_running = false;
1530 pg_log_info("server was stopped");
1531}
1532
1533/*
1534 * Returns after the server finishes the recovery process.
1535 *
1536 * If recovery_timeout option is set, terminate abnormally without finishing
1537 * the recovery process. By default, it waits forever.
1538 *
1539 * XXX Is the recovery process still in progress? When recovery process has a
1540 * better progress reporting mechanism, it should be added here.
1541 */
1542static void
1543wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1544{
1545 PGconn *conn;
1546 int status = POSTMASTER_STILL_STARTING;
1547 int timer = 0;
1548
1549 pg_log_info("waiting for the target server to reach the consistent state");
1550
1551 conn = connect_database(conninfo, true);
1552
1553 for (;;)
1554 {
1555 bool in_recovery = server_is_in_recovery(conn);
1556
1557 /*
1558 * Does the recovery process finish? In dry run mode, there is no
1559 * recovery mode. Bail out as the recovery process has ended.
1560 */
1561 if (!in_recovery || dry_run)
1562 {
1563 status = POSTMASTER_READY;
1564 recovery_ended = true;
1565 break;
1566 }
1567
1568 /* Bail out after recovery_timeout seconds if this option is set */
1569 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1570 {
1572 pg_log_error("recovery timed out");
1574 }
1575
1576 /* Keep waiting */
1578
1579 timer += WAIT_INTERVAL;
1580 }
1581
1582 disconnect_database(conn, false);
1583
1584 if (status == POSTMASTER_STILL_STARTING)
1585 pg_fatal("server did not end recovery");
1586
1587 pg_log_info("target server reached the consistent state");
1588 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1589}
1590
1591/*
1592 * Create a publication that includes all tables in the database.
1593 */
1594static void
1596{
1598 PGresult *res;
1599 char *ipubname_esc;
1600 char *spubname_esc;
1601
1602 Assert(conn != NULL);
1603
1604 ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1605 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1606
1607 /* Check if the publication already exists */
1609 "SELECT 1 FROM pg_catalog.pg_publication "
1610 "WHERE pubname = %s",
1611 spubname_esc);
1612 res = PQexec(conn, str->data);
1613 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1614 {
1615 pg_log_error("could not obtain publication information: %s",
1618 }
1619
1620 if (PQntuples(res) == 1)
1621 {
1622 /*
1623 * Unfortunately, if it reaches this code path, it will always fail
1624 * (unless you decide to change the existing publication name). That's
1625 * bad but it is very unlikely that the user will choose a name with
1626 * pg_createsubscriber_ prefix followed by the exact database oid and
1627 * a random number.
1628 */
1629 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1630 pg_log_error_hint("Consider renaming this publication before continuing.");
1632 }
1633
1634 PQclear(res);
1636
1637 pg_log_info("creating publication \"%s\" in database \"%s\"",
1638 dbinfo->pubname, dbinfo->dbname);
1639
1640 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1641 ipubname_esc);
1642
1643 pg_log_debug("command is: %s", str->data);
1644
1645 if (!dry_run)
1646 {
1647 res = PQexec(conn, str->data);
1648 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1649 {
1650 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1651 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1653 }
1654 PQclear(res);
1655 }
1656
1657 /* For cleanup purposes */
1658 dbinfo->made_publication = true;
1659
1660 PQfreemem(ipubname_esc);
1661 PQfreemem(spubname_esc);
1663}
1664
1665/*
1666 * Remove publication if it couldn't finish all steps.
1667 */
1668static void
1670{
1672 PGresult *res;
1673 char *pubname_esc;
1674
1675 Assert(conn != NULL);
1676
1677 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1678
1679 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1680 dbinfo->pubname, dbinfo->dbname);
1681
1682 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1683
1684 PQfreemem(pubname_esc);
1685
1686 pg_log_debug("command is: %s", str->data);
1687
1688 if (!dry_run)
1689 {
1690 res = PQexec(conn, str->data);
1691 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1692 {
1693 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1694 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1695 dbinfo->made_publication = false; /* don't try again. */
1696
1697 /*
1698 * Don't disconnect and exit here. This routine is used by primary
1699 * (cleanup publication / replication slot due to an error) and
1700 * subscriber (remove the replicated publications). In both cases,
1701 * it can continue and provide instructions for the user to remove
1702 * it later if cleanup fails.
1703 */
1704 }
1705 PQclear(res);
1706 }
1707
1709}
1710
1711/*
1712 * Create a subscription with some predefined options.
1713 *
1714 * A replication slot was already created in a previous step. Let's use it. It
1715 * is not required to copy data. The subscription will be created but it will
1716 * not be enabled now. That's because the replication progress must be set and
1717 * the replication origin name (one of the function arguments) contains the
1718 * subscription OID in its name. Once the subscription is created,
1719 * set_replication_progress() can obtain the chosen origin name and set up its
1720 * initial location.
1721 */
1722static void
1724{
1726 PGresult *res;
1727 char *pubname_esc;
1728 char *subname_esc;
1729 char *pubconninfo_esc;
1730 char *replslotname_esc;
1731
1732 Assert(conn != NULL);
1733
1734 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1735 subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1736 pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1737 replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1738
1739 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1740 dbinfo->subname, dbinfo->dbname);
1741
1743 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1744 "WITH (create_slot = false, enabled = false, "
1745 "slot_name = %s, copy_data = false, two_phase = %s)",
1746 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1747 dbinfos.two_phase ? "true" : "false");
1748
1749 PQfreemem(pubname_esc);
1750 PQfreemem(subname_esc);
1751 PQfreemem(pubconninfo_esc);
1752 PQfreemem(replslotname_esc);
1753
1754 pg_log_debug("command is: %s", str->data);
1755
1756 if (!dry_run)
1757 {
1758 res = PQexec(conn, str->data);
1759 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1760 {
1761 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1762 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1764 }
1765 PQclear(res);
1766 }
1767
1769}
1770
1771/*
1772 * Sets the replication progress to the consistent LSN.
1773 *
1774 * The subscriber caught up to the consistent LSN provided by the last
1775 * replication slot that was created. The goal is to set up the initial
1776 * location for the logical replication that is the exact LSN that the
1777 * subscriber was promoted. Once the subscription is enabled it will start
1778 * streaming from that location onwards. In dry run mode, the subscription OID
1779 * and LSN are set to invalid values for printing purposes.
1780 */
1781static void
1782set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1783{
1785 PGresult *res;
1786 Oid suboid;
1787 char *subname;
1788 char *dbname;
1789 char *originname;
1790 char *lsnstr;
1791
1792 Assert(conn != NULL);
1793
1794 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1795 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1796
1798 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1799 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1800 "WHERE s.subname = %s AND d.datname = %s",
1801 subname, dbname);
1802
1803 res = PQexec(conn, str->data);
1804 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1805 {
1806 pg_log_error("could not obtain subscription OID: %s",
1809 }
1810
1811 if (PQntuples(res) != 1 && !dry_run)
1812 {
1813 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1814 PQntuples(res), 1);
1816 }
1817
1818 if (dry_run)
1819 {
1820 suboid = InvalidOid;
1822 }
1823 else
1824 {
1825 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1826 lsnstr = psprintf("%s", lsn);
1827 }
1828
1829 PQclear(res);
1830
1831 /*
1832 * The origin name is defined as pg_%u. %u is the subscription OID. See
1833 * ApplyWorkerMain().
1834 */
1835 originname = psprintf("pg_%u", suboid);
1836
1837 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1838 originname, lsnstr, dbinfo->dbname);
1839
1842 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1843 originname, lsnstr);
1844
1845 pg_log_debug("command is: %s", str->data);
1846
1847 if (!dry_run)
1848 {
1849 res = PQexec(conn, str->data);
1850 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1851 {
1852 pg_log_error("could not set replication progress for subscription \"%s\": %s",
1853 dbinfo->subname, PQresultErrorMessage(res));
1855 }
1856 PQclear(res);
1857 }
1858
1861 pg_free(originname);
1862 pg_free(lsnstr);
1864}
1865
1866/*
1867 * Enables the subscription.
1868 *
1869 * The subscription was created in a previous step but it was disabled. After
1870 * adjusting the initial logical replication location, enable the subscription.
1871 */
1872static void
1874{
1876 PGresult *res;
1877 char *subname;
1878
1879 Assert(conn != NULL);
1880
1881 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1882
1883 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1884 dbinfo->subname, dbinfo->dbname);
1885
1886 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1887
1888 pg_log_debug("command is: %s", str->data);
1889
1890 if (!dry_run)
1891 {
1892 res = PQexec(conn, str->data);
1893 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1894 {
1895 pg_log_error("could not enable subscription \"%s\": %s",
1896 dbinfo->subname, PQresultErrorMessage(res));
1898 }
1899
1900 PQclear(res);
1901 }
1902
1905}
1906
1907int
1908main(int argc, char **argv)
1909{
1910 static struct option long_options[] =
1911 {
1912 {"database", required_argument, NULL, 'd'},
1913 {"pgdata", required_argument, NULL, 'D'},
1914 {"dry-run", no_argument, NULL, 'n'},
1915 {"subscriber-port", required_argument, NULL, 'p'},
1916 {"publisher-server", required_argument, NULL, 'P'},
1917 {"socketdir", required_argument, NULL, 's'},
1918 {"recovery-timeout", required_argument, NULL, 't'},
1919 {"enable-two-phase", no_argument, NULL, 'T'},
1920 {"subscriber-username", required_argument, NULL, 'U'},
1921 {"verbose", no_argument, NULL, 'v'},
1922 {"version", no_argument, NULL, 'V'},
1923 {"help", no_argument, NULL, '?'},
1924 {"config-file", required_argument, NULL, 1},
1925 {"publication", required_argument, NULL, 2},
1926 {"replication-slot", required_argument, NULL, 3},
1927 {"subscription", required_argument, NULL, 4},
1928 {NULL, 0, NULL, 0}
1929 };
1930
1931 struct CreateSubscriberOptions opt = {0};
1932
1933 int c;
1934 int option_index;
1935
1936 char *pub_base_conninfo;
1937 char *sub_base_conninfo;
1938 char *dbname_conninfo = NULL;
1939
1940 uint64 pub_sysid;
1941 uint64 sub_sysid;
1942 struct stat statbuf;
1943
1944 char *consistent_lsn;
1945
1946 char pidfile[MAXPGPATH];
1947
1948 pg_logging_init(argv[0]);
1950 progname = get_progname(argv[0]);
1951 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
1952
1953 if (argc > 1)
1954 {
1955 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1956 {
1957 usage();
1958 exit(0);
1959 }
1960 else if (strcmp(argv[1], "-V") == 0
1961 || strcmp(argv[1], "--version") == 0)
1962 {
1963 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1964 exit(0);
1965 }
1966 }
1967
1968 /* Default settings */
1969 subscriber_dir = NULL;
1970 opt.config_file = NULL;
1971 opt.pub_conninfo_str = NULL;
1972 opt.socket_dir = NULL;
1974 opt.sub_username = NULL;
1975 opt.two_phase = false;
1977 {
1978 0
1979 };
1980 opt.recovery_timeout = 0;
1981
1982 /*
1983 * Don't allow it to be run as root. It uses pg_ctl which does not allow
1984 * it either.
1985 */
1986#ifndef WIN32
1987 if (geteuid() == 0)
1988 {
1989 pg_log_error("cannot be executed by \"root\"");
1990 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1991 progname);
1992 exit(1);
1993 }
1994#endif
1995
1997
1998 while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
1999 long_options, &option_index)) != -1)
2000 {
2001 switch (c)
2002 {
2003 case 'd':
2005 {
2007 num_dbs++;
2008 }
2009 else
2010 {
2011 pg_log_error("database \"%s\" specified more than once", optarg);
2012 exit(1);
2013 }
2014 break;
2015 case 'D':
2018 break;
2019 case 'n':
2020 dry_run = true;
2021 break;
2022 case 'p':
2023 opt.sub_port = pg_strdup(optarg);
2024 break;
2025 case 'P':
2027 break;
2028 case 's':
2031 break;
2032 case 't':
2033 opt.recovery_timeout = atoi(optarg);
2034 break;
2035 case 'T':
2036 opt.two_phase = true;
2037 break;
2038 case 'U':
2040 break;
2041 case 'v':
2043 break;
2044 case 1:
2046 break;
2047 case 2:
2049 {
2051 num_pubs++;
2052 }
2053 else
2054 {
2055 pg_log_error("publication \"%s\" specified more than once", optarg);
2056 exit(1);
2057 }
2058 break;
2059 case 3:
2061 {
2063 num_replslots++;
2064 }
2065 else
2066 {
2067 pg_log_error("replication slot \"%s\" specified more than once", optarg);
2068 exit(1);
2069 }
2070 break;
2071 case 4:
2073 {
2075 num_subs++;
2076 }
2077 else
2078 {
2079 pg_log_error("subscription \"%s\" specified more than once", optarg);
2080 exit(1);
2081 }
2082 break;
2083 default:
2084 /* getopt_long already emitted a complaint */
2085 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2086 exit(1);
2087 }
2088 }
2089
2090 /* Any non-option arguments? */
2091 if (optind < argc)
2092 {
2093 pg_log_error("too many command-line arguments (first is \"%s\")",
2094 argv[optind]);
2095 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2096 exit(1);
2097 }
2098
2099 /* Required arguments */
2100 if (subscriber_dir == NULL)
2101 {
2102 pg_log_error("no subscriber data directory specified");
2103 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2104 exit(1);
2105 }
2106
2107 /* If socket directory is not provided, use the current directory */
2108 if (opt.socket_dir == NULL)
2109 {
2110 char cwd[MAXPGPATH];
2111
2112 if (!getcwd(cwd, MAXPGPATH))
2113 pg_fatal("could not determine current directory");
2114 opt.socket_dir = pg_strdup(cwd);
2116 }
2117
2118 /*
2119 * Parse connection string. Build a base connection string that might be
2120 * reused by multiple databases.
2121 */
2122 if (opt.pub_conninfo_str == NULL)
2123 {
2124 /*
2125 * TODO use primary_conninfo (if available) from subscriber and
2126 * extract publisher connection string. Assume that there are
2127 * identical entries for physical and logical replication. If there is
2128 * not, we would fail anyway.
2129 */
2130 pg_log_error("no publisher connection string specified");
2131 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2132 exit(1);
2133 }
2134 pg_log_info("validating publisher connection string");
2135 pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2136 &dbname_conninfo);
2137 if (pub_base_conninfo == NULL)
2138 exit(1);
2139
2140 pg_log_info("validating subscriber connection string");
2141 sub_base_conninfo = get_sub_conninfo(&opt);
2142
2143 if (opt.database_names.head == NULL)
2144 {
2145 pg_log_info("no database was specified");
2146
2147 /*
2148 * If --database option is not provided, try to obtain the dbname from
2149 * the publisher conninfo. If dbname parameter is not available, error
2150 * out.
2151 */
2152 if (dbname_conninfo)
2153 {
2154 simple_string_list_append(&opt.database_names, dbname_conninfo);
2155 num_dbs++;
2156
2157 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2158 dbname_conninfo);
2159 }
2160 else
2161 {
2162 pg_log_error("no database name specified");
2163 pg_log_error_hint("Try \"%s --help\" for more information.",
2164 progname);
2165 exit(1);
2166 }
2167 }
2168
2169 /* Number of object names must match number of databases */
2170 if (num_pubs > 0 && num_pubs != num_dbs)
2171 {
2172 pg_log_error("wrong number of publication names specified");
2173 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2174 num_pubs, num_dbs);
2175 exit(1);
2176 }
2177 if (num_subs > 0 && num_subs != num_dbs)
2178 {
2179 pg_log_error("wrong number of subscription names specified");
2180 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2181 num_subs, num_dbs);
2182 exit(1);
2183 }
2184 if (num_replslots > 0 && num_replslots != num_dbs)
2185 {
2186 pg_log_error("wrong number of replication slot names specified");
2187 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2189 exit(1);
2190 }
2191
2192 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2193 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2194 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2195
2196 /* Rudimentary check for a data directory */
2198
2200
2201 /*
2202 * Store database information for publisher and subscriber. It should be
2203 * called before atexit() because its return is used in the
2204 * cleanup_objects_atexit().
2205 */
2206 dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2207
2208 /* Register a function to clean up objects in case of failure */
2209 atexit(cleanup_objects_atexit);
2210
2211 /*
2212 * Check if the subscriber data directory has the same system identifier
2213 * than the publisher data directory.
2214 */
2216 sub_sysid = get_standby_sysid(subscriber_dir);
2217 if (pub_sysid != sub_sysid)
2218 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2219
2220 /* Subscriber PID file */
2221 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2222
2223 /*
2224 * The standby server must not be running. If the server is started under
2225 * service manager and pg_createsubscriber stops it, the service manager
2226 * might react to this action and start the server again. Therefore,
2227 * refuse to proceed if the server is running to avoid possible failures.
2228 */
2229 if (stat(pidfile, &statbuf) == 0)
2230 {
2231 pg_log_error("standby server is running");
2232 pg_log_error_hint("Stop the standby server and try again.");
2233 exit(1);
2234 }
2235
2236 /*
2237 * Start a short-lived standby server with temporary parameters (provided
2238 * by command-line options). The goal is to avoid connections during the
2239 * transformation steps.
2240 */
2241 pg_log_info("starting the standby server with command-line options");
2242 start_standby_server(&opt, true, false);
2243
2244 /* Check if the standby server is ready for logical replication */
2246
2247 /* Check if the primary server is ready for logical replication */
2249
2250 /*
2251 * Stop the target server. The recovery process requires that the server
2252 * reaches a consistent state before targeting the recovery stop point.
2253 * Make sure a consistent state is reached (stop the target server
2254 * guarantees it) *before* creating the replication slots in
2255 * setup_publisher().
2256 */
2257 pg_log_info("stopping the subscriber");
2259
2260 /* Create the required objects for each database on publisher */
2261 consistent_lsn = setup_publisher(dbinfos.dbinfo);
2262
2263 /* Write the required recovery parameters */
2264 setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2265
2266 /*
2267 * Start subscriber so the recovery parameters will take effect. Wait
2268 * until accepting connections. We don't want to start logical replication
2269 * during setup.
2270 */
2271 pg_log_info("starting the subscriber");
2272 start_standby_server(&opt, true, true);
2273
2274 /* Waiting the subscriber to be promoted */
2276
2277 /*
2278 * Create the subscription for each database on subscriber. It does not
2279 * enable it immediately because it needs to adjust the replication start
2280 * point to the LSN reported by setup_publisher(). It also cleans up
2281 * publications created by this tool and replication to the standby.
2282 */
2283 setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2284
2285 /* Remove primary_slot_name if it exists on primary */
2287
2288 /* Remove failover replication slots if they exist on subscriber */
2290
2291 /* Stop the subscriber */
2292 pg_log_info("stopping the subscriber");
2294
2295 /* Change system identifier from subscriber */
2297
2298 success = true;
2299
2300 pg_log_info("Done!");
2301
2302 return 0;
2303}
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1185
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:160
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:310
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define _(x)
Definition: elog.c:90
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:792
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7368
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6084
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7490
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5224
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7553
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4363
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4369
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:56
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
Assert(PointerIsAligned(start, uint64))
const char * str
long val
Definition: informix.c:689
int i
Definition: isn.c:74
@ CONNECTION_OK
Definition: libpq-fe.h:83
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:124
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:127
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:176
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_info_hint(...)
Definition: logging.h:130
@ PG_LOG_WARNING
Definition: logging.h:38
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_log_debug(...)
Definition: logging.h:133
#define pg_fatal(...)
static PQExpBuffer recoveryconfcontents
#define MAXPGPATH
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
#define WAIT_INTERVAL
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
int main(int argc, char **argv)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static void usage()
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
@ POSTMASTER_READY
@ POSTMASTER_STILL_STARTING
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
#define USEC_PER_SEC
static char * get_base_conninfo(const char *conninfo, char **dbname)
static struct LogicalRepInfos dbinfos
static bool success
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static bool dry_run
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
static char * argv0
Definition: pg_ctl.c:93
static char * exec_path
Definition: pg_ctl.c:88
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89
char * datadir
NameData subname
static char * buf
Definition: pg_test_fsync.c:72
#define pg_log_warning(...)
Definition: pgfnames.c:24
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
void canonicalize_path(char *path)
Definition: path.c:337
#define snprintf
Definition: port.h:239
#define DEVNULL
Definition: port.h:161
const char * get_progname(const char *argv0)
Definition: path.c:652
#define printf(...)
Definition: port.h:245
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:125
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:28
void get_restricted_token(void)
void pg_usleep(long microsec)
Definition: signal.c:53
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
struct SimpleStringList SimpleStringList
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:582
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:698
uint64 system_identifier
Definition: pg_control.h:110
SimpleStringList database_names
SimpleStringList replslot_names
struct LogicalRepInfo * dbinfo
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
#define stat
Definition: win32_port.h:274
#define WIFEXITED(w)
Definition: win32_port.h:150
#define WIFSIGNALED(w)
Definition: win32_port.h:151
#define WTERMSIG(w)
Definition: win32_port.h:153
#define WEXITSTATUS(w)
Definition: win32_port.h:152
int gettimeofday(struct timeval *tp, void *tzp)
int wal_level
Definition: xlog.c:131
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28