PostgreSQL Source Code  git master
pg_createsubscriber.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_createsubscriber.c
4  * Create a new logical replica from a standby server
5  *
6  * Copyright (C) 2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/bin/pg_basebackup/pg_createsubscriber.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres_fe.h"
15 
16 #include <sys/time.h>
17 #include <sys/wait.h>
18 #include <time.h>
19 
20 #include "catalog/pg_authid_d.h"
21 #include "common/connect.h"
23 #include "common/file_perm.h"
24 #include "common/logging.h"
25 #include "common/pg_prng.h"
27 #include "fe_utils/recovery_gen.h"
28 #include "fe_utils/simple_list.h"
29 #include "fe_utils/string_utils.h"
30 #include "getopt_long.h"
31 
32 #define DEFAULT_SUB_PORT "50432"
33 
34 /* Command-line options */
36 {
37  char *config_file; /* configuration file */
38  char *pub_conninfo_str; /* publisher connection string */
39  char *socket_dir; /* directory for Unix-domain socket, if any */
40  char *sub_port; /* subscriber port number */
41  const char *sub_username; /* subscriber username */
42  SimpleStringList database_names; /* list of database names */
43  SimpleStringList pub_names; /* list of publication names */
44  SimpleStringList sub_names; /* list of subscription names */
45  SimpleStringList replslot_names; /* list of replication slot names */
46  int recovery_timeout; /* stop recovery after this time */
47 };
48 
50 {
51  char *dbname; /* database name */
52  char *pubconninfo; /* publisher connection string */
53  char *subconninfo; /* subscriber connection string */
54  char *pubname; /* publication name */
55  char *subname; /* subscription name */
56  char *replslotname; /* replication slot name */
57 
58  bool made_replslot; /* replication slot was created */
59  bool made_publication; /* publication was created */
60 };
61 
62 static void cleanup_objects_atexit(void);
63 static void usage();
64 static char *get_base_conninfo(const char *conninfo, char **dbname);
65 static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
66 static char *get_exec_path(const char *argv0, const char *progname);
67 static void check_data_directory(const char *datadir);
68 static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
69 static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
70  const char *pub_base_conninfo,
71  const char *sub_base_conninfo);
72 static PGconn *connect_database(const char *conninfo, bool exit_on_error);
73 static void disconnect_database(PGconn *conn, bool exit_on_error);
74 static uint64 get_primary_sysid(const char *conninfo);
75 static uint64 get_standby_sysid(const char *datadir);
76 static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
77 static bool server_is_in_recovery(PGconn *conn);
78 static char *generate_object_name(PGconn *conn);
79 static void check_publisher(const struct LogicalRepInfo *dbinfo);
80 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
81 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
82 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
83  const char *consistent_lsn);
84 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
85  const char *lsn);
87  const char *slotname);
90  struct LogicalRepInfo *dbinfo);
92  const char *slot_name);
93 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
94 static void start_standby_server(const struct CreateSubscriberOptions *opt,
95  bool restricted_access,
96  bool restrict_logical_worker);
97 static void stop_standby_server(const char *datadir);
98 static void wait_for_end_recovery(const char *conninfo,
99  const struct CreateSubscriberOptions *opt);
100 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
101 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
102 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
103 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
104  const char *lsn);
105 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
107  const struct LogicalRepInfo *dbinfo);
108 static void drop_existing_subscriptions(PGconn *conn, const char *subname,
109  const char *dbname);
110 
111 #define USEC_PER_SEC 1000000
112 #define WAIT_INTERVAL 1 /* 1 second */
113 
114 static const char *progname;
115 
116 static char *primary_slot_name = NULL;
117 static bool dry_run = false;
118 
119 static bool success = false;
120 
121 static struct LogicalRepInfo *dbinfo;
122 static int num_dbs = 0; /* number of specified databases */
123 static int num_pubs = 0; /* number of specified publications */
124 static int num_subs = 0; /* number of specified subscriptions */
125 static int num_replslots = 0; /* number of specified replication slots */
126 
128 
129 static char *pg_ctl_path = NULL;
130 static char *pg_resetwal_path = NULL;
131 
132 /* standby / subscriber data directory */
133 static char *subscriber_dir = NULL;
134 
135 static bool recovery_ended = false;
136 static bool standby_running = false;
137 
139 {
142 };
143 
144 
145 /*
146  * Cleanup objects that were created by pg_createsubscriber if there is an
147  * error.
148  *
149  * Publications and replication slots are created on primary. Depending on the
150  * step it failed, it should remove the already created objects if it is
151  * possible (sometimes it won't work due to a connection issue).
152  * There is no cleanup on the target server. The steps on the target server are
153  * executed *after* promotion, hence, at this point, a failure means recreate
154  * the physical replica and start again.
155  */
156 static void
158 {
159  if (success)
160  return;
161 
162  /*
163  * If the server is promoted, there is no way to use the current setup
164  * again. Warn the user that a new replication setup should be done before
165  * trying again.
166  */
167  if (recovery_ended)
168  {
169  pg_log_warning("failed after the end of recovery");
170  pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
171  "You must recreate the physical replica before continuing.");
172  }
173 
174  for (int i = 0; i < num_dbs; i++)
175  {
177  {
178  PGconn *conn;
179 
181  if (conn != NULL)
182  {
185  if (dbinfo[i].made_replslot)
187  disconnect_database(conn, false);
188  }
189  else
190  {
191  /*
192  * If a connection could not be established, inform the user
193  * that some objects were left on primary and should be
194  * removed before trying again.
195  */
197  {
198  pg_log_warning("publication \"%s\" in database \"%s\" on primary might be left behind",
200  pg_log_warning_hint("Consider dropping this publication before trying again.");
201  }
202  if (dbinfo[i].made_replslot)
203  {
204  pg_log_warning("replication slot \"%s\" in database \"%s\" on primary might be left behind",
206  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
207  }
208  }
209  }
210  }
211 
212  if (standby_running)
214 }
215 
216 static void
217 usage(void)
218 {
219  printf(_("%s creates a new logical replica from a standby server.\n\n"),
220  progname);
221  printf(_("Usage:\n"));
222  printf(_(" %s [OPTION]...\n"), progname);
223  printf(_("\nOptions:\n"));
224  printf(_(" -d, --database=DBNAME database to create a subscription\n"));
225  printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
226  printf(_(" -n, --dry-run dry run, just show what would be done\n"));
227  printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
228  printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
229  printf(_(" -s, --socket-directory=DIR socket directory to use (default current directory)\n"));
230  printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
231  printf(_(" -U, --subscriber-username=NAME subscriber username\n"));
232  printf(_(" -v, --verbose output verbose messages\n"));
233  printf(_(" --config-file=FILENAME use specified main server configuration\n"
234  " file when running target cluster\n"));
235  printf(_(" --publication=NAME publication name\n"));
236  printf(_(" --replication-slot=NAME replication slot name\n"));
237  printf(_(" --subscription=NAME subscription name\n"));
238  printf(_(" -V, --version output version information, then exit\n"));
239  printf(_(" -?, --help show this help, then exit\n"));
240  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
241  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
242 }
243 
244 /*
245  * Subroutine to append "keyword=value" to a connection string,
246  * with proper quoting of the value. (We assume keywords don't need that.)
247  */
248 static void
249 appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
250 {
251  if (buf->len > 0)
253  appendPQExpBufferStr(buf, keyword);
256 }
257 
258 /*
259  * Validate a connection string. Returns a base connection string that is a
260  * connection string without a database name.
261  *
262  * Since we might process multiple databases, each database name will be
263  * appended to this base connection string to provide a final connection
264  * string. If the second argument (dbname) is not null, returns dbname if the
265  * provided connection string contains it.
266  *
267  * It is the caller's responsibility to free the returned connection string and
268  * dbname.
269  */
270 static char *
271 get_base_conninfo(const char *conninfo, char **dbname)
272 {
274  PQconninfoOption *conn_opts;
275  PQconninfoOption *conn_opt;
276  char *errmsg = NULL;
277  char *ret;
278 
279  conn_opts = PQconninfoParse(conninfo, &errmsg);
280  if (conn_opts == NULL)
281  {
282  pg_log_error("could not parse connection string: %s", errmsg);
283  PQfreemem(errmsg);
284  return NULL;
285  }
286 
288  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
289  {
290  if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
291  {
292  if (strcmp(conn_opt->keyword, "dbname") == 0)
293  {
294  if (dbname)
295  *dbname = pg_strdup(conn_opt->val);
296  continue;
297  }
298  appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
299  }
300  }
301 
302  ret = pg_strdup(buf->data);
303 
305  PQconninfoFree(conn_opts);
306 
307  return ret;
308 }
309 
310 /*
311  * Build a subscriber connection string. Only a few parameters are supported
312  * since it starts a server with restricted access.
313  */
314 static char *
316 {
318  char *ret;
319 
320  appendConnStrItem(buf, "port", opt->sub_port);
321 #if !defined(WIN32)
322  appendConnStrItem(buf, "host", opt->socket_dir);
323 #endif
324  if (opt->sub_username != NULL)
325  appendConnStrItem(buf, "user", opt->sub_username);
326  appendConnStrItem(buf, "fallback_application_name", progname);
327 
328  ret = pg_strdup(buf->data);
329 
331 
332  return ret;
333 }
334 
335 /*
336  * Verify if a PostgreSQL binary (progname) is available in the same directory as
337  * pg_createsubscriber and it has the same version. It returns the absolute
338  * path of the progname.
339  */
340 static char *
341 get_exec_path(const char *argv0, const char *progname)
342 {
343  char *versionstr;
344  char *exec_path;
345  int ret;
346 
347  versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
349  ret = find_other_exec(argv0, progname, versionstr, exec_path);
350 
351  if (ret < 0)
352  {
353  char full_path[MAXPGPATH];
354 
355  if (find_my_exec(argv0, full_path) < 0)
356  strlcpy(full_path, progname, sizeof(full_path));
357 
358  if (ret == -1)
359  pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
360  progname, "pg_createsubscriber", full_path);
361  else
362  pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
363  progname, full_path, "pg_createsubscriber");
364  }
365 
366  pg_log_debug("%s path is: %s", progname, exec_path);
367 
368  return exec_path;
369 }
370 
371 /*
372  * Is it a cluster directory? These are preliminary checks. It is far from
373  * making an accurate check. If it is not a clone from the publisher, it will
374  * eventually fail in a future step.
375  */
376 static void
378 {
379  struct stat statbuf;
380  char versionfile[MAXPGPATH];
381 
382  pg_log_info("checking if directory \"%s\" is a cluster data directory",
383  datadir);
384 
385  if (stat(datadir, &statbuf) != 0)
386  {
387  if (errno == ENOENT)
388  pg_fatal("data directory \"%s\" does not exist", datadir);
389  else
390  pg_fatal("could not access directory \"%s\": %m", datadir);
391  }
392 
393  snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
394  if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
395  {
396  pg_fatal("directory \"%s\" is not a database cluster directory",
397  datadir);
398  }
399 }
400 
401 /*
402  * Append database name into a base connection string.
403  *
404  * dbname is the only parameter that changes so it is not included in the base
405  * connection string. This function concatenates dbname to build a "real"
406  * connection string.
407  */
408 static char *
409 concat_conninfo_dbname(const char *conninfo, const char *dbname)
410 {
412  char *ret;
413 
414  Assert(conninfo != NULL);
415 
416  appendPQExpBufferStr(buf, conninfo);
417  appendConnStrItem(buf, "dbname", dbname);
418 
419  ret = pg_strdup(buf->data);
421 
422  return ret;
423 }
424 
425 /*
426  * Store publication and subscription information.
427  *
428  * If publication, replication slot and subscription names were specified,
429  * store it here. Otherwise, a generated name will be assigned to the object in
430  * setup_publisher().
431  */
432 static struct LogicalRepInfo *
434  const char *pub_base_conninfo,
435  const char *sub_base_conninfo)
436 {
437  struct LogicalRepInfo *dbinfo;
438  SimpleStringListCell *pubcell = NULL;
439  SimpleStringListCell *subcell = NULL;
440  SimpleStringListCell *replslotcell = NULL;
441  int i = 0;
442 
444 
445  if (num_pubs > 0)
446  pubcell = opt->pub_names.head;
447  if (num_subs > 0)
448  subcell = opt->sub_names.head;
449  if (num_replslots > 0)
450  replslotcell = opt->replslot_names.head;
451 
452  for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
453  {
454  char *conninfo;
455 
456  /* Fill publisher attributes */
457  conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
458  dbinfo[i].pubconninfo = conninfo;
459  dbinfo[i].dbname = cell->val;
460  if (num_pubs > 0)
461  dbinfo[i].pubname = pubcell->val;
462  else
463  dbinfo[i].pubname = NULL;
464  if (num_replslots > 0)
465  dbinfo[i].replslotname = replslotcell->val;
466  else
467  dbinfo[i].replslotname = NULL;
468  dbinfo[i].made_replslot = false;
469  dbinfo[i].made_publication = false;
470  /* Fill subscriber attributes */
471  conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
472  dbinfo[i].subconninfo = conninfo;
473  if (num_subs > 0)
474  dbinfo[i].subname = subcell->val;
475  else
476  dbinfo[i].subname = NULL;
477  /* Other fields will be filled later */
478 
479  pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
480  dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
481  dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
482  dbinfo[i].pubconninfo);
483  pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
484  dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
485  dbinfo[i].subconninfo);
486 
487  if (num_pubs > 0)
488  pubcell = pubcell->next;
489  if (num_subs > 0)
490  subcell = subcell->next;
491  if (num_replslots > 0)
492  replslotcell = replslotcell->next;
493 
494  i++;
495  }
496 
497  return dbinfo;
498 }
499 
500 /*
501  * Open a new connection. If exit_on_error is true, it has an undesired
502  * condition and it should exit immediately.
503  */
504 static PGconn *
505 connect_database(const char *conninfo, bool exit_on_error)
506 {
507  PGconn *conn;
508  PGresult *res;
509 
510  conn = PQconnectdb(conninfo);
511  if (PQstatus(conn) != CONNECTION_OK)
512  {
513  pg_log_error("connection to database failed: %s",
515  PQfinish(conn);
516 
517  if (exit_on_error)
518  exit(1);
519  return NULL;
520  }
521 
522  /* Secure search_path */
525  {
526  pg_log_error("could not clear search_path: %s",
528  PQclear(res);
529  PQfinish(conn);
530 
531  if (exit_on_error)
532  exit(1);
533  return NULL;
534  }
535  PQclear(res);
536 
537  return conn;
538 }
539 
540 /*
541  * Close the connection. If exit_on_error is true, it has an undesired
542  * condition and it should exit immediately.
543  */
544 static void
545 disconnect_database(PGconn *conn, bool exit_on_error)
546 {
547  Assert(conn != NULL);
548 
549  PQfinish(conn);
550 
551  if (exit_on_error)
552  exit(1);
553 }
554 
555 /*
556  * Obtain the system identifier using the provided connection. It will be used
557  * to compare if a data directory is a clone of another one.
558  */
559 static uint64
560 get_primary_sysid(const char *conninfo)
561 {
562  PGconn *conn;
563  PGresult *res;
564  uint64 sysid;
565 
566  pg_log_info("getting system identifier from publisher");
567 
568  conn = connect_database(conninfo, true);
569 
570  res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
572  {
573  pg_log_error("could not get system identifier: %s",
575  disconnect_database(conn, true);
576  }
577  if (PQntuples(res) != 1)
578  {
579  pg_log_error("could not get system identifier: got %d rows, expected %d row",
580  PQntuples(res), 1);
581  disconnect_database(conn, true);
582  }
583 
584  sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
585 
586  pg_log_info("system identifier is %llu on publisher",
587  (unsigned long long) sysid);
588 
589  PQclear(res);
590  disconnect_database(conn, false);
591 
592  return sysid;
593 }
594 
595 /*
596  * Obtain the system identifier from control file. It will be used to compare
597  * if a data directory is a clone of another one. This routine is used locally
598  * and avoids a connection.
599  */
600 static uint64
602 {
603  ControlFileData *cf;
604  bool crc_ok;
605  uint64 sysid;
606 
607  pg_log_info("getting system identifier from subscriber");
608 
609  cf = get_controlfile(datadir, &crc_ok);
610  if (!crc_ok)
611  pg_fatal("control file appears to be corrupt");
612 
613  sysid = cf->system_identifier;
614 
615  pg_log_info("system identifier is %llu on subscriber",
616  (unsigned long long) sysid);
617 
618  pg_free(cf);
619 
620  return sysid;
621 }
622 
623 /*
624  * Modify the system identifier. Since a standby server preserves the system
625  * identifier, it makes sense to change it to avoid situations in which WAL
626  * files from one of the systems might be used in the other one.
627  */
628 static void
630 {
631  ControlFileData *cf;
632  bool crc_ok;
633  struct timeval tv;
634 
635  char *cmd_str;
636 
637  pg_log_info("modifying system identifier of subscriber");
638 
639  cf = get_controlfile(subscriber_dir, &crc_ok);
640  if (!crc_ok)
641  pg_fatal("control file appears to be corrupt");
642 
643  /*
644  * Select a new system identifier.
645  *
646  * XXX this code was extracted from BootStrapXLOG().
647  */
648  gettimeofday(&tv, NULL);
649  cf->system_identifier = ((uint64) tv.tv_sec) << 32;
650  cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
651  cf->system_identifier |= getpid() & 0xFFF;
652 
653  if (!dry_run)
655 
656  pg_log_info("system identifier is %llu on subscriber",
657  (unsigned long long) cf->system_identifier);
658 
659  pg_log_info("running pg_resetwal on the subscriber");
660 
661  cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
663 
664  pg_log_debug("pg_resetwal command is: %s", cmd_str);
665 
666  if (!dry_run)
667  {
668  int rc = system(cmd_str);
669 
670  if (rc == 0)
671  pg_log_info("subscriber successfully changed the system identifier");
672  else
673  pg_fatal("subscriber failed to change system identifier: exit code: %d", rc);
674  }
675 
676  pg_free(cf);
677 }
678 
679 /*
680  * Generate an object name using a prefix, database oid and a random integer.
681  * It is used in case the user does not specify an object name (publication,
682  * subscription, replication slot).
683  */
684 static char *
686 {
687  PGresult *res;
688  Oid oid;
689  uint32 rand;
690  char *objname;
691 
692  res = PQexec(conn,
693  "SELECT oid FROM pg_catalog.pg_database "
694  "WHERE datname = pg_catalog.current_database()");
696  {
697  pg_log_error("could not obtain database OID: %s",
699  disconnect_database(conn, true);
700  }
701 
702  if (PQntuples(res) != 1)
703  {
704  pg_log_error("could not obtain database OID: got %d rows, expected %d row",
705  PQntuples(res), 1);
706  disconnect_database(conn, true);
707  }
708 
709  /* Database OID */
710  oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
711 
712  PQclear(res);
713 
714  /* Random unsigned integer */
715  rand = pg_prng_uint32(&prng_state);
716 
717  /*
718  * Build the object name. The name must not exceed NAMEDATALEN - 1. This
719  * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
720  * '\0').
721  */
722  objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
723 
724  return objname;
725 }
726 
727 /*
728  * Create the publications and replication slots in preparation for logical
729  * replication. Returns the LSN from latest replication slot. It will be the
730  * replication start point that is used to adjust the subscriptions (see
731  * set_replication_progress).
732  */
733 static char *
735 {
736  char *lsn = NULL;
737 
738  pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
739 
740  for (int i = 0; i < num_dbs; i++)
741  {
742  PGconn *conn;
743  char *genname = NULL;
744 
745  conn = connect_database(dbinfo[i].pubconninfo, true);
746 
747  /*
748  * If an object name was not specified as command-line options, assign
749  * a generated object name. The replication slot has a different rule.
750  * The subscription name is assigned to the replication slot name if
751  * no replication slot is specified. It follows the same rule as
752  * CREATE SUBSCRIPTION.
753  */
754  if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
755  genname = generate_object_name(conn);
756  if (num_pubs == 0)
757  dbinfo[i].pubname = pg_strdup(genname);
758  if (num_subs == 0)
759  dbinfo[i].subname = pg_strdup(genname);
760  if (num_replslots == 0)
762 
763  /*
764  * Create publication on publisher. This step should be executed
765  * *before* promoting the subscriber to avoid any transactions between
766  * consistent LSN and the new publication rows (such transactions
767  * wouldn't see the new publication rows resulting in an error).
768  */
770 
771  /* Create replication slot on publisher */
772  if (lsn)
773  pg_free(lsn);
775  if (lsn != NULL || dry_run)
776  pg_log_info("create replication slot \"%s\" on publisher",
777  dbinfo[i].replslotname);
778  else
779  exit(1);
780 
781  disconnect_database(conn, false);
782  }
783 
784  return lsn;
785 }
786 
787 /*
788  * Is recovery still in progress?
789  */
790 static bool
792 {
793  PGresult *res;
794  int ret;
795 
796  res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
797 
799  {
800  pg_log_error("could not obtain recovery progress: %s",
802  disconnect_database(conn, true);
803  }
804 
805 
806  ret = strcmp("t", PQgetvalue(res, 0, 0));
807 
808  PQclear(res);
809 
810  return ret == 0;
811 }
812 
813 /*
814  * Is the primary server ready for logical replication?
815  *
816  * XXX Does it not allow a synchronous replica?
817  */
818 static void
820 {
821  PGconn *conn;
822  PGresult *res;
823  bool failed = false;
824 
825  char *wal_level;
826  int max_repslots;
827  int cur_repslots;
828  int max_walsenders;
829  int cur_walsenders;
830  int max_prepared_transactions;
831 
832  pg_log_info("checking settings on publisher");
833 
834  conn = connect_database(dbinfo[0].pubconninfo, true);
835 
836  /*
837  * If the primary server is in recovery (i.e. cascading replication),
838  * objects (publication) cannot be created because it is read only.
839  */
841  {
842  pg_log_error("primary server cannot be in recovery");
843  disconnect_database(conn, true);
844  }
845 
846  /*------------------------------------------------------------------------
847  * Logical replication requires a few parameters to be set on publisher.
848  * Since these parameters are not a requirement for physical replication,
849  * we should check it to make sure it won't fail.
850  *
851  * - wal_level = logical
852  * - max_replication_slots >= current + number of dbs to be converted
853  * - max_wal_senders >= current + number of dbs to be converted
854  * -----------------------------------------------------------------------
855  */
856  res = PQexec(conn,
857  "SELECT pg_catalog.current_setting('wal_level'),"
858  " pg_catalog.current_setting('max_replication_slots'),"
859  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
860  " pg_catalog.current_setting('max_wal_senders'),"
861  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
862  " pg_catalog.current_setting('max_prepared_transactions')");
863 
865  {
866  pg_log_error("could not obtain publisher settings: %s",
868  disconnect_database(conn, true);
869  }
870 
871  wal_level = pg_strdup(PQgetvalue(res, 0, 0));
872  max_repslots = atoi(PQgetvalue(res, 0, 1));
873  cur_repslots = atoi(PQgetvalue(res, 0, 2));
874  max_walsenders = atoi(PQgetvalue(res, 0, 3));
875  cur_walsenders = atoi(PQgetvalue(res, 0, 4));
876  max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
877 
878  PQclear(res);
879 
880  pg_log_debug("publisher: wal_level: %s", wal_level);
881  pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
882  pg_log_debug("publisher: current replication slots: %d", cur_repslots);
883  pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
884  pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
885  pg_log_debug("publisher: max_prepared_transactions: %d",
886  max_prepared_transactions);
887 
888  disconnect_database(conn, false);
889 
890  if (strcmp(wal_level, "logical") != 0)
891  {
892  pg_log_error("publisher requires wal_level >= \"logical\"");
893  failed = true;
894  }
895 
896  if (max_repslots - cur_repslots < num_dbs)
897  {
898  pg_log_error("publisher requires %d replication slots, but only %d remain",
899  num_dbs, max_repslots - cur_repslots);
900  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
901  "max_replication_slots", cur_repslots + num_dbs);
902  failed = true;
903  }
904 
905  if (max_walsenders - cur_walsenders < num_dbs)
906  {
907  pg_log_error("publisher requires %d wal sender processes, but only %d remain",
908  num_dbs, max_walsenders - cur_walsenders);
909  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
910  "max_wal_senders", cur_walsenders + num_dbs);
911  failed = true;
912  }
913 
914  if (max_prepared_transactions != 0)
915  {
916  pg_log_warning("two_phase option will not be enabled for slots");
917  pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
918  "Prepared transactions will be replicated at COMMIT PREPARED.");
919  }
920 
922 
923  if (failed)
924  exit(1);
925 }
926 
927 /*
928  * Is the standby server ready for logical replication?
929  *
930  * XXX Does it not allow a time-delayed replica?
931  *
932  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
933  * is S, it cannot detect there is a replica (server C) because server S starts
934  * accepting only local connections and server C cannot connect to it. Hence,
935  * there is not a reliable way to provide a suitable error saying the server C
936  * will be broken at the end of this process (due to pg_resetwal).
937  */
938 static void
940 {
941  PGconn *conn;
942  PGresult *res;
943  bool failed = false;
944 
945  int max_lrworkers;
946  int max_repslots;
947  int max_wprocs;
948 
949  pg_log_info("checking settings on subscriber");
950 
951  conn = connect_database(dbinfo[0].subconninfo, true);
952 
953  /* The target server must be a standby */
955  {
956  pg_log_error("target server must be a standby");
957  disconnect_database(conn, true);
958  }
959 
960  /*------------------------------------------------------------------------
961  * Logical replication requires a few parameters to be set on subscriber.
962  * Since these parameters are not a requirement for physical replication,
963  * we should check it to make sure it won't fail.
964  *
965  * - max_replication_slots >= number of dbs to be converted
966  * - max_logical_replication_workers >= number of dbs to be converted
967  * - max_worker_processes >= 1 + number of dbs to be converted
968  *------------------------------------------------------------------------
969  */
970  res = PQexec(conn,
971  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
972  "'max_logical_replication_workers', "
973  "'max_replication_slots', "
974  "'max_worker_processes', "
975  "'primary_slot_name') "
976  "ORDER BY name");
977 
979  {
980  pg_log_error("could not obtain subscriber settings: %s",
982  disconnect_database(conn, true);
983  }
984 
985  max_lrworkers = atoi(PQgetvalue(res, 0, 0));
986  max_repslots = atoi(PQgetvalue(res, 1, 0));
987  max_wprocs = atoi(PQgetvalue(res, 2, 0));
988  if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
990 
991  pg_log_debug("subscriber: max_logical_replication_workers: %d",
992  max_lrworkers);
993  pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
994  pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
995  if (primary_slot_name)
996  pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
997 
998  PQclear(res);
999 
1000  disconnect_database(conn, false);
1001 
1002  if (max_repslots < num_dbs)
1003  {
1004  pg_log_error("subscriber requires %d replication slots, but only %d remain",
1005  num_dbs, max_repslots);
1006  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1007  "max_replication_slots", num_dbs);
1008  failed = true;
1009  }
1010 
1011  if (max_lrworkers < num_dbs)
1012  {
1013  pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1014  num_dbs, max_lrworkers);
1015  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1016  "max_logical_replication_workers", num_dbs);
1017  failed = true;
1018  }
1019 
1020  if (max_wprocs < num_dbs + 1)
1021  {
1022  pg_log_error("subscriber requires %d worker processes, but only %d remain",
1023  num_dbs + 1, max_wprocs);
1024  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1025  "max_worker_processes", num_dbs + 1);
1026  failed = true;
1027  }
1028 
1029  if (failed)
1030  exit(1);
1031 }
1032 
1033 /*
1034  * Drop a specified subscription. This is to avoid duplicate subscriptions on
1035  * the primary (publisher node) and the newly created subscriber. We
1036  * shouldn't drop the associated slot as that would be used by the publisher
1037  * node.
1038  */
1039 static void
1041 {
1042  PQExpBuffer query = createPQExpBuffer();
1043  PGresult *res;
1044 
1045  Assert(conn != NULL);
1046 
1047  /*
1048  * Construct a query string. These commands are allowed to be executed
1049  * within a transaction.
1050  */
1051  appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1052  subname);
1053  appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1054  subname);
1055  appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1056 
1057  pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1058  subname, dbname);
1059 
1060  if (!dry_run)
1061  {
1062  res = PQexec(conn, query->data);
1063 
1065  {
1066  pg_log_error("could not drop a subscription \"%s\" settings: %s",
1068  disconnect_database(conn, true);
1069  }
1070 
1071  PQclear(res);
1072  }
1073 
1074  destroyPQExpBuffer(query);
1075 }
1076 
1077 /*
1078  * Retrieve and drop the pre-existing subscriptions.
1079  */
1080 static void
1082  const struct LogicalRepInfo *dbinfo)
1083 {
1084  PQExpBuffer query = createPQExpBuffer();
1085  char *dbname;
1086  PGresult *res;
1087 
1088  Assert(conn != NULL);
1089 
1091 
1092  appendPQExpBuffer(query,
1093  "SELECT s.subname FROM pg_catalog.pg_subscription s "
1094  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1095  "WHERE d.datname = %s",
1096  dbname);
1097  res = PQexec(conn, query->data);
1098 
1100  {
1101  pg_log_error("could not obtain pre-existing subscriptions: %s",
1103  disconnect_database(conn, true);
1104  }
1105 
1106  for (int i = 0; i < PQntuples(res); i++)
1108  dbinfo->dbname);
1109 
1110  PQclear(res);
1111  destroyPQExpBuffer(query);
1112 }
1113 
1114 /*
1115  * Create the subscriptions, adjust the initial location for logical
1116  * replication and enable the subscriptions. That's the last step for logical
1117  * replication setup.
1118  */
1119 static void
1120 setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1121 {
1122  for (int i = 0; i < num_dbs; i++)
1123  {
1124  PGconn *conn;
1125 
1126  /* Connect to subscriber. */
1127  conn = connect_database(dbinfo[i].subconninfo, true);
1128 
1129  /*
1130  * We don't need the pre-existing subscriptions on the newly formed
1131  * subscriber. They can connect to other publisher nodes and either
1132  * get some unwarranted data or can lead to ERRORs in connecting to
1133  * such nodes.
1134  */
1136 
1137  /*
1138  * Since the publication was created before the consistent LSN, it is
1139  * available on the subscriber when the physical replica is promoted.
1140  * Remove publications from the subscriber because it has no use.
1141  */
1143 
1145 
1146  /* Set the replication progress to the correct LSN */
1147  set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1148 
1149  /* Enable subscription */
1151 
1152  disconnect_database(conn, false);
1153  }
1154 }
1155 
1156 /*
1157  * Write the required recovery parameters.
1158  */
1159 static void
1160 setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1161 {
1162  PGconn *conn;
1164 
1165  /*
1166  * Despite of the recovery parameters will be written to the subscriber,
1167  * use a publisher connection. The primary_conninfo is generated using the
1168  * connection settings.
1169  */
1170  conn = connect_database(dbinfo[0].pubconninfo, true);
1171 
1172  /*
1173  * Write recovery parameters.
1174  *
1175  * The subscriber is not running yet. In dry run mode, the recovery
1176  * parameters *won't* be written. An invalid LSN is used for printing
1177  * purposes. Additional recovery parameters are added here. It avoids
1178  * unexpected behavior such as end of recovery as soon as a consistent
1179  * state is reached (recovery_target) and failure due to multiple recovery
1180  * targets (name, time, xid, LSN).
1181  */
1183  appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1185  "recovery_target_timeline = 'latest'\n");
1187  "recovery_target_inclusive = true\n");
1189  "recovery_target_action = promote\n");
1190  appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1191  appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1192  appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1193 
1194  if (dry_run)
1195  {
1196  appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1198  "recovery_target_lsn = '%X/%X'\n",
1200  }
1201  else
1202  {
1203  appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1204  lsn);
1206  }
1207  disconnect_database(conn, false);
1208 
1209  pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1210 }
1211 
1212 /*
1213  * Drop physical replication slot on primary if the standby was using it. After
1214  * the transformation, it has no use.
1215  *
1216  * XXX we might not fail here. Instead, we provide a warning so the user
1217  * eventually drops this replication slot later.
1218  */
1219 static void
1221 {
1222  PGconn *conn;
1223 
1224  /* Replication slot does not exist, do nothing */
1225  if (!primary_slot_name)
1226  return;
1227 
1228  conn = connect_database(dbinfo[0].pubconninfo, false);
1229  if (conn != NULL)
1230  {
1231  drop_replication_slot(conn, &dbinfo[0], slotname);
1232  disconnect_database(conn, false);
1233  }
1234  else
1235  {
1236  pg_log_warning("could not drop replication slot \"%s\" on primary",
1237  slotname);
1238  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1239  }
1240 }
1241 
1242 /*
1243  * Drop failover replication slots on subscriber. After the transformation,
1244  * they have no use.
1245  *
1246  * XXX We do not fail here. Instead, we provide a warning so the user can drop
1247  * them later.
1248  */
1249 static void
1251 {
1252  PGconn *conn;
1253  PGresult *res;
1254 
1255  conn = connect_database(dbinfo[0].subconninfo, false);
1256  if (conn != NULL)
1257  {
1258  /* Get failover replication slot names */
1259  res = PQexec(conn,
1260  "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1261 
1263  {
1264  /* Remove failover replication slots from subscriber */
1265  for (int i = 0; i < PQntuples(res); i++)
1267  }
1268  else
1269  {
1270  pg_log_warning("could not obtain failover replication slot information: %s",
1272  pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1273  }
1274 
1275  PQclear(res);
1276  disconnect_database(conn, false);
1277  }
1278  else
1279  {
1280  pg_log_warning("could not drop failover replication slot");
1281  pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1282  }
1283 }
1284 
1285 /*
1286  * Create a logical replication slot and returns a LSN.
1287  *
1288  * CreateReplicationSlot() is not used because it does not provide the one-row
1289  * result set that contains the LSN.
1290  */
1291 static char *
1293 {
1295  PGresult *res = NULL;
1296  const char *slot_name = dbinfo->replslotname;
1297  char *slot_name_esc;
1298  char *lsn = NULL;
1299 
1300  Assert(conn != NULL);
1301 
1302  pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1303  slot_name, dbinfo->dbname);
1304 
1305  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1306 
1308  "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1309  slot_name_esc);
1310 
1311  pg_free(slot_name_esc);
1312 
1313  pg_log_debug("command is: %s", str->data);
1314 
1315  if (!dry_run)
1316  {
1317  res = PQexec(conn, str->data);
1319  {
1320  pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1321  slot_name, dbinfo->dbname,
1323  PQclear(res);
1325  return NULL;
1326  }
1327 
1328  lsn = pg_strdup(PQgetvalue(res, 0, 0));
1329  PQclear(res);
1330  }
1331 
1332  /* For cleanup purposes */
1333  dbinfo->made_replslot = true;
1334 
1336 
1337  return lsn;
1338 }
1339 
1340 static void
1342  const char *slot_name)
1343 {
1345  char *slot_name_esc;
1346  PGresult *res;
1347 
1348  Assert(conn != NULL);
1349 
1350  pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1351  slot_name, dbinfo->dbname);
1352 
1353  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1354 
1355  appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1356 
1357  pg_free(slot_name_esc);
1358 
1359  pg_log_debug("command is: %s", str->data);
1360 
1361  if (!dry_run)
1362  {
1363  res = PQexec(conn, str->data);
1365  {
1366  pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1367  slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1368  dbinfo->made_replslot = false; /* don't try again. */
1369  }
1370 
1371  PQclear(res);
1372  }
1373 
1375 }
1376 
1377 /*
1378  * Reports a suitable message if pg_ctl fails.
1379  */
1380 static void
1381 pg_ctl_status(const char *pg_ctl_cmd, int rc)
1382 {
1383  if (rc != 0)
1384  {
1385  if (WIFEXITED(rc))
1386  {
1387  pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1388  }
1389  else if (WIFSIGNALED(rc))
1390  {
1391 #if defined(WIN32)
1392  pg_log_error("pg_ctl was terminated by exception 0x%X",
1393  WTERMSIG(rc));
1394  pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1395 #else
1396  pg_log_error("pg_ctl was terminated by signal %d: %s",
1397  WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1398 #endif
1399  }
1400  else
1401  {
1402  pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1403  }
1404 
1405  pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1406  exit(1);
1407  }
1408 }
1409 
1410 static void
1411 start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1412  bool restrict_logical_worker)
1413 {
1414  PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1415  int rc;
1416 
1417  appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1418  appendShellString(pg_ctl_cmd, subscriber_dir);
1419  appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1420  if (restricted_access)
1421  {
1422  appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1423 #if !defined(WIN32)
1424 
1425  /*
1426  * An empty listen_addresses list means the server does not listen on
1427  * any IP interfaces; only Unix-domain sockets can be used to connect
1428  * to the server. Prevent external connections to minimize the chance
1429  * of failure.
1430  */
1431  appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1432  if (opt->socket_dir)
1433  appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1434  opt->socket_dir);
1435  appendPQExpBufferChar(pg_ctl_cmd, '"');
1436 #endif
1437  }
1438  if (opt->config_file != NULL)
1439  appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1440  opt->config_file);
1441 
1442  /* Suppress to start logical replication if requested */
1443  if (restrict_logical_worker)
1444  appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1445 
1446  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1447  rc = system(pg_ctl_cmd->data);
1448  pg_ctl_status(pg_ctl_cmd->data, rc);
1449  standby_running = true;
1450  destroyPQExpBuffer(pg_ctl_cmd);
1451  pg_log_info("server was started");
1452 }
1453 
1454 static void
1456 {
1457  char *pg_ctl_cmd;
1458  int rc;
1459 
1460  pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1461  datadir);
1462  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1463  rc = system(pg_ctl_cmd);
1464  pg_ctl_status(pg_ctl_cmd, rc);
1465  standby_running = false;
1466  pg_log_info("server was stopped");
1467 }
1468 
1469 /*
1470  * Returns after the server finishes the recovery process.
1471  *
1472  * If recovery_timeout option is set, terminate abnormally without finishing
1473  * the recovery process. By default, it waits forever.
1474  *
1475  * XXX Is the recovery process still in progress? When recovery process has a
1476  * better progress reporting mechanism, it should be added here.
1477  */
1478 static void
1479 wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1480 {
1481  PGconn *conn;
1482  int status = POSTMASTER_STILL_STARTING;
1483  int timer = 0;
1484 
1485  pg_log_info("waiting for the target server to reach the consistent state");
1486 
1487  conn = connect_database(conninfo, true);
1488 
1489  for (;;)
1490  {
1491  bool in_recovery = server_is_in_recovery(conn);
1492 
1493  /*
1494  * Does the recovery process finish? In dry run mode, there is no
1495  * recovery mode. Bail out as the recovery process has ended.
1496  */
1497  if (!in_recovery || dry_run)
1498  {
1499  status = POSTMASTER_READY;
1500  recovery_ended = true;
1501  break;
1502  }
1503 
1504  /* Bail out after recovery_timeout seconds if this option is set */
1505  if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1506  {
1508  pg_log_error("recovery timed out");
1509  disconnect_database(conn, true);
1510  }
1511 
1512  /* Keep waiting */
1514 
1515  timer += WAIT_INTERVAL;
1516  }
1517 
1518  disconnect_database(conn, false);
1519 
1520  if (status == POSTMASTER_STILL_STARTING)
1521  pg_fatal("server did not end recovery");
1522 
1523  pg_log_info("target server reached the consistent state");
1524  pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1525 }
1526 
1527 /*
1528  * Create a publication that includes all tables in the database.
1529  */
1530 static void
1532 {
1534  PGresult *res;
1535  char *ipubname_esc;
1536  char *spubname_esc;
1537 
1538  Assert(conn != NULL);
1539 
1540  ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1541  spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1542 
1543  /* Check if the publication already exists */
1545  "SELECT 1 FROM pg_catalog.pg_publication "
1546  "WHERE pubname = %s",
1547  spubname_esc);
1548  res = PQexec(conn, str->data);
1550  {
1551  pg_log_error("could not obtain publication information: %s",
1553  disconnect_database(conn, true);
1554  }
1555 
1556  if (PQntuples(res) == 1)
1557  {
1558  /*
1559  * Unfortunately, if it reaches this code path, it will always fail
1560  * (unless you decide to change the existing publication name). That's
1561  * bad but it is very unlikely that the user will choose a name with
1562  * pg_createsubscriber_ prefix followed by the exact database oid and
1563  * a random number.
1564  */
1565  pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1566  pg_log_error_hint("Consider renaming this publication before continuing.");
1567  disconnect_database(conn, true);
1568  }
1569 
1570  PQclear(res);
1572 
1573  pg_log_info("creating publication \"%s\" in database \"%s\"",
1575 
1576  appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1577  ipubname_esc);
1578 
1579  pg_log_debug("command is: %s", str->data);
1580 
1581  if (!dry_run)
1582  {
1583  res = PQexec(conn, str->data);
1585  {
1586  pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1588  disconnect_database(conn, true);
1589  }
1590  PQclear(res);
1591  }
1592 
1593  /* For cleanup purposes */
1594  dbinfo->made_publication = true;
1595 
1596  pg_free(ipubname_esc);
1597  pg_free(spubname_esc);
1599 }
1600 
1601 /*
1602  * Remove publication if it couldn't finish all steps.
1603  */
1604 static void
1606 {
1608  PGresult *res;
1609  char *pubname_esc;
1610 
1611  Assert(conn != NULL);
1612 
1613  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1614 
1615  pg_log_info("dropping publication \"%s\" in database \"%s\"",
1617 
1618  appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1619 
1620  pg_free(pubname_esc);
1621 
1622  pg_log_debug("command is: %s", str->data);
1623 
1624  if (!dry_run)
1625  {
1626  res = PQexec(conn, str->data);
1628  {
1629  pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1631  dbinfo->made_publication = false; /* don't try again. */
1632 
1633  /*
1634  * Don't disconnect and exit here. This routine is used by primary
1635  * (cleanup publication / replication slot due to an error) and
1636  * subscriber (remove the replicated publications). In both cases,
1637  * it can continue and provide instructions for the user to remove
1638  * it later if cleanup fails.
1639  */
1640  }
1641  PQclear(res);
1642  }
1643 
1645 }
1646 
1647 /*
1648  * Create a subscription with some predefined options.
1649  *
1650  * A replication slot was already created in a previous step. Let's use it. It
1651  * is not required to copy data. The subscription will be created but it will
1652  * not be enabled now. That's because the replication progress must be set and
1653  * the replication origin name (one of the function arguments) contains the
1654  * subscription OID in its name. Once the subscription is created,
1655  * set_replication_progress() can obtain the chosen origin name and set up its
1656  * initial location.
1657  */
1658 static void
1660 {
1662  PGresult *res;
1663  char *pubname_esc;
1664  char *subname_esc;
1665  char *pubconninfo_esc;
1666  char *replslotname_esc;
1667 
1668  Assert(conn != NULL);
1669 
1670  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1671  subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1672  pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1673  replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1674 
1675  pg_log_info("creating subscription \"%s\" in database \"%s\"",
1677 
1679  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1680  "WITH (create_slot = false, enabled = false, "
1681  "slot_name = %s, copy_data = false)",
1682  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1683 
1684  pg_free(pubname_esc);
1685  pg_free(subname_esc);
1686  pg_free(pubconninfo_esc);
1687  pg_free(replslotname_esc);
1688 
1689  pg_log_debug("command is: %s", str->data);
1690 
1691  if (!dry_run)
1692  {
1693  res = PQexec(conn, str->data);
1695  {
1696  pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1698  disconnect_database(conn, true);
1699  }
1700  PQclear(res);
1701  }
1702 
1704 }
1705 
1706 /*
1707  * Sets the replication progress to the consistent LSN.
1708  *
1709  * The subscriber caught up to the consistent LSN provided by the last
1710  * replication slot that was created. The goal is to set up the initial
1711  * location for the logical replication that is the exact LSN that the
1712  * subscriber was promoted. Once the subscription is enabled it will start
1713  * streaming from that location onwards. In dry run mode, the subscription OID
1714  * and LSN are set to invalid values for printing purposes.
1715  */
1716 static void
1717 set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1718 {
1720  PGresult *res;
1721  Oid suboid;
1722  char *subname;
1723  char *dbname;
1724  char *originname;
1725  char *lsnstr;
1726 
1727  Assert(conn != NULL);
1728 
1731 
1733  "SELECT s.oid FROM pg_catalog.pg_subscription s "
1734  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1735  "WHERE s.subname = %s AND d.datname = %s",
1736  subname, dbname);
1737 
1738  res = PQexec(conn, str->data);
1740  {
1741  pg_log_error("could not obtain subscription OID: %s",
1743  disconnect_database(conn, true);
1744  }
1745 
1746  if (PQntuples(res) != 1 && !dry_run)
1747  {
1748  pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1749  PQntuples(res), 1);
1750  disconnect_database(conn, true);
1751  }
1752 
1753  if (dry_run)
1754  {
1755  suboid = InvalidOid;
1756  lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1757  }
1758  else
1759  {
1760  suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1761  lsnstr = psprintf("%s", lsn);
1762  }
1763 
1764  PQclear(res);
1765 
1766  /*
1767  * The origin name is defined as pg_%u. %u is the subscription OID. See
1768  * ApplyWorkerMain().
1769  */
1770  originname = psprintf("pg_%u", suboid);
1771 
1772  pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) in database \"%s\"",
1773  originname, lsnstr, dbinfo->dbname);
1774 
1777  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1778  originname, lsnstr);
1779 
1780  pg_log_debug("command is: %s", str->data);
1781 
1782  if (!dry_run)
1783  {
1784  res = PQexec(conn, str->data);
1786  {
1787  pg_log_error("could not set replication progress for the subscription \"%s\": %s",
1789  disconnect_database(conn, true);
1790  }
1791  PQclear(res);
1792  }
1793 
1794  pg_free(subname);
1795  pg_free(dbname);
1796  pg_free(originname);
1797  pg_free(lsnstr);
1799 }
1800 
1801 /*
1802  * Enables the subscription.
1803  *
1804  * The subscription was created in a previous step but it was disabled. After
1805  * adjusting the initial logical replication location, enable the subscription.
1806  */
1807 static void
1809 {
1811  PGresult *res;
1812  char *subname;
1813 
1814  Assert(conn != NULL);
1815 
1817 
1818  pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1820 
1821  appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1822 
1823  pg_log_debug("command is: %s", str->data);
1824 
1825  if (!dry_run)
1826  {
1827  res = PQexec(conn, str->data);
1829  {
1830  pg_log_error("could not enable subscription \"%s\": %s",
1832  disconnect_database(conn, true);
1833  }
1834 
1835  PQclear(res);
1836  }
1837 
1838  pg_free(subname);
1840 }
1841 
1842 int
1843 main(int argc, char **argv)
1844 {
1845  static struct option long_options[] =
1846  {
1847  {"database", required_argument, NULL, 'd'},
1848  {"pgdata", required_argument, NULL, 'D'},
1849  {"dry-run", no_argument, NULL, 'n'},
1850  {"subscriber-port", required_argument, NULL, 'p'},
1851  {"publisher-server", required_argument, NULL, 'P'},
1852  {"socket-directory", required_argument, NULL, 's'},
1853  {"recovery-timeout", required_argument, NULL, 't'},
1854  {"subscriber-username", required_argument, NULL, 'U'},
1855  {"verbose", no_argument, NULL, 'v'},
1856  {"version", no_argument, NULL, 'V'},
1857  {"help", no_argument, NULL, '?'},
1858  {"config-file", required_argument, NULL, 1},
1859  {"publication", required_argument, NULL, 2},
1860  {"replication-slot", required_argument, NULL, 3},
1861  {"subscription", required_argument, NULL, 4},
1862  {NULL, 0, NULL, 0}
1863  };
1864 
1865  struct CreateSubscriberOptions opt = {0};
1866 
1867  int c;
1868  int option_index;
1869 
1870  char *pub_base_conninfo;
1871  char *sub_base_conninfo;
1872  char *dbname_conninfo = NULL;
1873 
1874  uint64 pub_sysid;
1875  uint64 sub_sysid;
1876  struct stat statbuf;
1877 
1878  char *consistent_lsn;
1879 
1880  char pidfile[MAXPGPATH];
1881 
1882  pg_logging_init(argv[0]);
1884  progname = get_progname(argv[0]);
1885  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
1886 
1887  if (argc > 1)
1888  {
1889  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1890  {
1891  usage();
1892  exit(0);
1893  }
1894  else if (strcmp(argv[1], "-V") == 0
1895  || strcmp(argv[1], "--version") == 0)
1896  {
1897  puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1898  exit(0);
1899  }
1900  }
1901 
1902  /* Default settings */
1903  subscriber_dir = NULL;
1904  opt.config_file = NULL;
1905  opt.pub_conninfo_str = NULL;
1906  opt.socket_dir = NULL;
1907  opt.sub_port = DEFAULT_SUB_PORT;
1908  opt.sub_username = NULL;
1910  {
1911  0
1912  };
1913  opt.recovery_timeout = 0;
1914 
1915  /*
1916  * Don't allow it to be run as root. It uses pg_ctl which does not allow
1917  * it either.
1918  */
1919 #ifndef WIN32
1920  if (geteuid() == 0)
1921  {
1922  pg_log_error("cannot be executed by \"root\"");
1923  pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1924  progname);
1925  exit(1);
1926  }
1927 #endif
1928 
1930 
1931  while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1932  long_options, &option_index)) != -1)
1933  {
1934  switch (c)
1935  {
1936  case 'd':
1938  {
1940  num_dbs++;
1941  }
1942  else
1943  {
1944  pg_log_error("duplicate database \"%s\"", optarg);
1945  exit(1);
1946  }
1947  break;
1948  case 'D':
1951  break;
1952  case 'n':
1953  dry_run = true;
1954  break;
1955  case 'p':
1956  opt.sub_port = pg_strdup(optarg);
1957  break;
1958  case 'P':
1960  break;
1961  case 's':
1962  opt.socket_dir = pg_strdup(optarg);
1964  break;
1965  case 't':
1966  opt.recovery_timeout = atoi(optarg);
1967  break;
1968  case 'U':
1969  opt.sub_username = pg_strdup(optarg);
1970  break;
1971  case 'v':
1973  break;
1974  case 1:
1975  opt.config_file = pg_strdup(optarg);
1976  break;
1977  case 2:
1979  {
1981  num_pubs++;
1982  }
1983  else
1984  {
1985  pg_log_error("duplicate publication \"%s\"", optarg);
1986  exit(1);
1987  }
1988  break;
1989  case 3:
1991  {
1993  num_replslots++;
1994  }
1995  else
1996  {
1997  pg_log_error("duplicate replication slot \"%s\"", optarg);
1998  exit(1);
1999  }
2000  break;
2001  case 4:
2003  {
2005  num_subs++;
2006  }
2007  else
2008  {
2009  pg_log_error("duplicate subscription \"%s\"", optarg);
2010  exit(1);
2011  }
2012  break;
2013  default:
2014  /* getopt_long already emitted a complaint */
2015  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2016  exit(1);
2017  }
2018  }
2019 
2020  /* Any non-option arguments? */
2021  if (optind < argc)
2022  {
2023  pg_log_error("too many command-line arguments (first is \"%s\")",
2024  argv[optind]);
2025  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2026  exit(1);
2027  }
2028 
2029  /* Required arguments */
2030  if (subscriber_dir == NULL)
2031  {
2032  pg_log_error("no subscriber data directory specified");
2033  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2034  exit(1);
2035  }
2036 
2037  /* If socket directory is not provided, use the current directory */
2038  if (opt.socket_dir == NULL)
2039  {
2040  char cwd[MAXPGPATH];
2041 
2042  if (!getcwd(cwd, MAXPGPATH))
2043  pg_fatal("could not determine current directory");
2044  opt.socket_dir = pg_strdup(cwd);
2046  }
2047 
2048  /*
2049  * Parse connection string. Build a base connection string that might be
2050  * reused by multiple databases.
2051  */
2052  if (opt.pub_conninfo_str == NULL)
2053  {
2054  /*
2055  * TODO use primary_conninfo (if available) from subscriber and
2056  * extract publisher connection string. Assume that there are
2057  * identical entries for physical and logical replication. If there is
2058  * not, we would fail anyway.
2059  */
2060  pg_log_error("no publisher connection string specified");
2061  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2062  exit(1);
2063  }
2064  pg_log_info("validating connection string on publisher");
2065  pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2066  &dbname_conninfo);
2067  if (pub_base_conninfo == NULL)
2068  exit(1);
2069 
2070  pg_log_info("validating connection string on subscriber");
2071  sub_base_conninfo = get_sub_conninfo(&opt);
2072 
2073  if (opt.database_names.head == NULL)
2074  {
2075  pg_log_info("no database was specified");
2076 
2077  /*
2078  * If --database option is not provided, try to obtain the dbname from
2079  * the publisher conninfo. If dbname parameter is not available, error
2080  * out.
2081  */
2082  if (dbname_conninfo)
2083  {
2084  simple_string_list_append(&opt.database_names, dbname_conninfo);
2085  num_dbs++;
2086 
2087  pg_log_info("database \"%s\" was extracted from the publisher connection string",
2088  dbname_conninfo);
2089  }
2090  else
2091  {
2092  pg_log_error("no database name specified");
2093  pg_log_error_hint("Try \"%s --help\" for more information.",
2094  progname);
2095  exit(1);
2096  }
2097  }
2098 
2099  /* Number of object names must match number of databases */
2100  if (num_pubs > 0 && num_pubs != num_dbs)
2101  {
2102  pg_log_error("wrong number of publication names");
2103  pg_log_error_hint("Number of publication names (%d) must match number of database names (%d).",
2104  num_pubs, num_dbs);
2105  exit(1);
2106  }
2107  if (num_subs > 0 && num_subs != num_dbs)
2108  {
2109  pg_log_error("wrong number of subscription names");
2110  pg_log_error_hint("Number of subscription names (%d) must match number of database names (%d).",
2111  num_subs, num_dbs);
2112  exit(1);
2113  }
2114  if (num_replslots > 0 && num_replslots != num_dbs)
2115  {
2116  pg_log_error("wrong number of replication slot names");
2117  pg_log_error_hint("Number of replication slot names (%d) must match number of database names (%d).",
2119  exit(1);
2120  }
2121 
2122  /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2123  pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2124  pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2125 
2126  /* Rudimentary check for a data directory */
2128 
2129  /*
2130  * Store database information for publisher and subscriber. It should be
2131  * called before atexit() because its return is used in the
2132  * cleanup_objects_atexit().
2133  */
2134  dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2135 
2136  /* Register a function to clean up objects in case of failure */
2137  atexit(cleanup_objects_atexit);
2138 
2139  /*
2140  * Check if the subscriber data directory has the same system identifier
2141  * than the publisher data directory.
2142  */
2143  pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2144  sub_sysid = get_standby_sysid(subscriber_dir);
2145  if (pub_sysid != sub_sysid)
2146  pg_fatal("subscriber data directory is not a copy of the source database cluster");
2147 
2148  /* Subscriber PID file */
2149  snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2150 
2151  /*
2152  * The standby server must not be running. If the server is started under
2153  * service manager and pg_createsubscriber stops it, the service manager
2154  * might react to this action and start the server again. Therefore,
2155  * refuse to proceed if the server is running to avoid possible failures.
2156  */
2157  if (stat(pidfile, &statbuf) == 0)
2158  {
2159  pg_log_error("standby is up and running");
2160  pg_log_error_hint("Stop the standby and try again.");
2161  exit(1);
2162  }
2163 
2164  /*
2165  * Start a short-lived standby server with temporary parameters (provided
2166  * by command-line options). The goal is to avoid connections during the
2167  * transformation steps.
2168  */
2169  pg_log_info("starting the standby with command-line options");
2170  start_standby_server(&opt, true, false);
2171 
2172  /* Check if the standby server is ready for logical replication */
2174 
2175  /* Check if the primary server is ready for logical replication */
2177 
2178  /*
2179  * Stop the target server. The recovery process requires that the server
2180  * reaches a consistent state before targeting the recovery stop point.
2181  * Make sure a consistent state is reached (stop the target server
2182  * guarantees it) *before* creating the replication slots in
2183  * setup_publisher().
2184  */
2185  pg_log_info("stopping the subscriber");
2187 
2188  /*
2189  * Create the required objects for each database on publisher. This step
2190  * is here mainly because if we stop the standby we cannot verify if the
2191  * primary slot is in use. We could use an extra connection for it but it
2192  * doesn't seem worth.
2193  */
2194  consistent_lsn = setup_publisher(dbinfo);
2195 
2196  /* Write the required recovery parameters */
2197  setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2198 
2199  /*
2200  * Start subscriber so the recovery parameters will take effect. Wait
2201  * until accepting connections. We don't want to start logical replication
2202  * during setup.
2203  */
2204  pg_log_info("starting the subscriber");
2205  start_standby_server(&opt, true, true);
2206 
2207  /* Waiting the subscriber to be promoted */
2208  wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2209 
2210  /*
2211  * Create the subscription for each database on subscriber. It does not
2212  * enable it immediately because it needs to adjust the replication start
2213  * point to the LSN reported by setup_publisher(). It also cleans up
2214  * publications created by this tool and replication to the standby.
2215  */
2216  setup_subscriber(dbinfo, consistent_lsn);
2217 
2218  /* Remove primary_slot_name if it exists on primary */
2220 
2221  /* Remove failover replication slots if they exist on subscriber */
2223 
2224  /* Stop the subscriber */
2225  pg_log_info("stopping the subscriber");
2227 
2228  /* Change system identifier from subscriber */
2230 
2231  success = true;
2232 
2233  pg_log_info("Done!");
2234 
2235  return 0;
2236 }
unsigned int uint32
Definition: c.h:506
#define Assert(condition)
Definition: c.h:858
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1214
#define strtou64(str, endptr, base)
Definition: c.h:1298
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:160
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:448
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:329
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define _(x)
Definition: elog.c:90
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5752
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7004
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7171
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7118
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4892
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:744
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4310
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4304
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:44
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:24
#define required_argument
Definition: getopt_long.h:25
const char * str
long val
Definition: informix.c:670
int i
Definition: isn.c:73
@ CONNECTION_OK
Definition: libpq-fe.h:60
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:99
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:102
exit(1)
void pg_logging_increase_verbosity(void)
Definition: logging.c:184
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:175
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_info_hint(...)
Definition: logging.h:130
@ PG_LOG_WARNING
Definition: logging.h:38
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_log_debug(...)
Definition: logging.h:133
#define pg_fatal(...)
static PQExpBuffer recoveryconfcontents
#define MAXPGPATH
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
#define WAIT_INTERVAL
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static struct LogicalRepInfo * dbinfo
static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
int main(int argc, char **argv)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static void usage()
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
@ POSTMASTER_READY
@ POSTMASTER_STILL_STARTING
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
#define USEC_PER_SEC
static char * get_base_conninfo(const char *conninfo, char **dbname)
static bool success
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static bool dry_run
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
static char * argv0
Definition: pg_ctl.c:93
static char * exec_path
Definition: pg_ctl.c:88
PGDLLIMPORT int optind
Definition: getopt.c:50
PGDLLIMPORT char * optarg
Definition: getopt.c:52
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89
char * datadir
NameData subname
static char * buf
Definition: pg_test_fsync.c:73
#define pg_log_warning(...)
Definition: pgfnames.c:24
void canonicalize_path(char *path)
Definition: path.c:264
const char * get_progname(const char *argv0)
Definition: path.c:574
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
#define snprintf
Definition: port.h:238
#define DEVNULL
Definition: port.h:160
#define printf(...)
Definition: port.h:244
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:124
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:27
void get_restricted_token(void)
void pg_usleep(long microsec)
Definition: signal.c:53
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
struct SimpleStringList SimpleStringList
char * dbname
Definition: streamutil.c:52
PGconn * conn
Definition: streamutil.c:55
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:429
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:545
uint64 system_identifier
Definition: pg_control.h:110
SimpleStringList database_names
SimpleStringList replslot_names
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42
#define stat
Definition: win32_port.h:284
#define WIFEXITED(w)
Definition: win32_port.h:152
#define WIFSIGNALED(w)
Definition: win32_port.h:153
#define WTERMSIG(w)
Definition: win32_port.h:155
#define WEXITSTATUS(w)
Definition: win32_port.h:154
int gettimeofday(struct timeval *tp, void *tzp)
int wal_level
Definition: xlog.c:129
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28