PostgreSQL Source Code  git master
pg_createsubscriber.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * pg_createsubscriber.c
4  * Create a new logical replica from a standby server
5  *
6  * Copyright (C) 2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/bin/pg_basebackup/pg_createsubscriber.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres_fe.h"
15 
16 #include <sys/stat.h>
17 #include <sys/time.h>
18 #include <sys/wait.h>
19 #include <time.h>
20 
21 #include "common/connect.h"
23 #include "common/logging.h"
24 #include "common/pg_prng.h"
26 #include "fe_utils/recovery_gen.h"
27 #include "fe_utils/simple_list.h"
28 #include "fe_utils/string_utils.h"
29 #include "getopt_long.h"
30 
31 #define DEFAULT_SUB_PORT "50432"
32 
33 /* Command-line options */
35 {
36  char *config_file; /* configuration file */
37  char *pub_conninfo_str; /* publisher connection string */
38  char *socket_dir; /* directory for Unix-domain socket, if any */
39  char *sub_port; /* subscriber port number */
40  const char *sub_username; /* subscriber username */
41  SimpleStringList database_names; /* list of database names */
42  SimpleStringList pub_names; /* list of publication names */
43  SimpleStringList sub_names; /* list of subscription names */
44  SimpleStringList replslot_names; /* list of replication slot names */
45  int recovery_timeout; /* stop recovery after this time */
46 };
47 
49 {
50  char *dbname; /* database name */
51  char *pubconninfo; /* publisher connection string */
52  char *subconninfo; /* subscriber connection string */
53  char *pubname; /* publication name */
54  char *subname; /* subscription name */
55  char *replslotname; /* replication slot name */
56 
57  bool made_replslot; /* replication slot was created */
58  bool made_publication; /* publication was created */
59 };
60 
61 static void cleanup_objects_atexit(void);
62 static void usage();
63 static char *get_base_conninfo(const char *conninfo, char **dbname);
64 static char *get_sub_conninfo(const struct CreateSubscriberOptions *opt);
65 static char *get_exec_path(const char *argv0, const char *progname);
66 static void check_data_directory(const char *datadir);
67 static char *concat_conninfo_dbname(const char *conninfo, const char *dbname);
68 static struct LogicalRepInfo *store_pub_sub_info(const struct CreateSubscriberOptions *opt,
69  const char *pub_base_conninfo,
70  const char *sub_base_conninfo);
71 static PGconn *connect_database(const char *conninfo, bool exit_on_error);
72 static void disconnect_database(PGconn *conn, bool exit_on_error);
73 static uint64 get_primary_sysid(const char *conninfo);
74 static uint64 get_standby_sysid(const char *datadir);
75 static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt);
76 static bool server_is_in_recovery(PGconn *conn);
77 static char *generate_object_name(PGconn *conn);
78 static void check_publisher(const struct LogicalRepInfo *dbinfo);
79 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
80 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
81 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
82  const char *consistent_lsn);
83 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
84  const char *lsn);
86  const char *slotname);
89  struct LogicalRepInfo *dbinfo);
91  const char *slot_name);
92 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
93 static void start_standby_server(const struct CreateSubscriberOptions *opt,
94  bool restricted_access,
95  bool restrict_logical_worker);
96 static void stop_standby_server(const char *datadir);
97 static void wait_for_end_recovery(const char *conninfo,
98  const struct CreateSubscriberOptions *opt);
99 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
100 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
101 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
102 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
103  const char *lsn);
104 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
106  const struct LogicalRepInfo *dbinfo);
107 static void drop_existing_subscriptions(PGconn *conn, const char *subname,
108  const char *dbname);
109 
110 #define USEC_PER_SEC 1000000
111 #define WAIT_INTERVAL 1 /* 1 second */
112 
113 static const char *progname;
114 
115 static char *primary_slot_name = NULL;
116 static bool dry_run = false;
117 
118 static bool success = false;
119 
120 static struct LogicalRepInfo *dbinfo;
121 static int num_dbs = 0; /* number of specified databases */
122 static int num_pubs = 0; /* number of specified publications */
123 static int num_subs = 0; /* number of specified subscriptions */
124 static int num_replslots = 0; /* number of specified replication slots */
125 
127 
128 static char *pg_ctl_path = NULL;
129 static char *pg_resetwal_path = NULL;
130 
131 /* standby / subscriber data directory */
132 static char *subscriber_dir = NULL;
133 
134 static bool recovery_ended = false;
135 static bool standby_running = false;
136 
138 {
141 };
142 
143 
144 /*
145  * Cleanup objects that were created by pg_createsubscriber if there is an
146  * error.
147  *
148  * Publications and replication slots are created on primary. Depending on the
149  * step it failed, it should remove the already created objects if it is
150  * possible (sometimes it won't work due to a connection issue).
151  * There is no cleanup on the target server. The steps on the target server are
152  * executed *after* promotion, hence, at this point, a failure means recreate
153  * the physical replica and start again.
154  */
155 static void
157 {
158  if (success)
159  return;
160 
161  /*
162  * If the server is promoted, there is no way to use the current setup
163  * again. Warn the user that a new replication setup should be done before
164  * trying again.
165  */
166  if (recovery_ended)
167  {
168  pg_log_warning("failed after the end of recovery");
169  pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
170  "You must recreate the physical replica before continuing.");
171  }
172 
173  for (int i = 0; i < num_dbs; i++)
174  {
176  {
177  PGconn *conn;
178 
180  if (conn != NULL)
181  {
184  if (dbinfo[i].made_replslot)
186  disconnect_database(conn, false);
187  }
188  else
189  {
190  /*
191  * If a connection could not be established, inform the user
192  * that some objects were left on primary and should be
193  * removed before trying again.
194  */
196  {
197  pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
199  pg_log_warning_hint("Drop this publication before trying again.");
200  }
201  if (dbinfo[i].made_replslot)
202  {
203  pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
205  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
206  }
207  }
208  }
209  }
210 
211  if (standby_running)
213 }
214 
215 static void
216 usage(void)
217 {
218  printf(_("%s creates a new logical replica from a standby server.\n\n"),
219  progname);
220  printf(_("Usage:\n"));
221  printf(_(" %s [OPTION]...\n"), progname);
222  printf(_("\nOptions:\n"));
223  printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
224  printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
225  printf(_(" -n, --dry-run dry run, just show what would be done\n"));
226  printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
227  printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
228  printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
229  printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
230  printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
231  printf(_(" -v, --verbose output verbose messages\n"));
232  printf(_(" --config-file=FILENAME use specified main server configuration\n"
233  " file when running target cluster\n"));
234  printf(_(" --publication=NAME publication name\n"));
235  printf(_(" --replication-slot=NAME replication slot name\n"));
236  printf(_(" --subscription=NAME subscription name\n"));
237  printf(_(" -V, --version output version information, then exit\n"));
238  printf(_(" -?, --help show this help, then exit\n"));
239  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
240  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
241 }
242 
243 /*
244  * Subroutine to append "keyword=value" to a connection string,
245  * with proper quoting of the value. (We assume keywords don't need that.)
246  */
247 static void
248 appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
249 {
250  if (buf->len > 0)
252  appendPQExpBufferStr(buf, keyword);
255 }
256 
257 /*
258  * Validate a connection string. Returns a base connection string that is a
259  * connection string without a database name.
260  *
261  * Since we might process multiple databases, each database name will be
262  * appended to this base connection string to provide a final connection
263  * string. If the second argument (dbname) is not null, returns dbname if the
264  * provided connection string contains it.
265  *
266  * It is the caller's responsibility to free the returned connection string and
267  * dbname.
268  */
269 static char *
270 get_base_conninfo(const char *conninfo, char **dbname)
271 {
273  PQconninfoOption *conn_opts;
274  PQconninfoOption *conn_opt;
275  char *errmsg = NULL;
276  char *ret;
277 
278  conn_opts = PQconninfoParse(conninfo, &errmsg);
279  if (conn_opts == NULL)
280  {
281  pg_log_error("could not parse connection string: %s", errmsg);
282  PQfreemem(errmsg);
283  return NULL;
284  }
285 
287  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
288  {
289  if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
290  {
291  if (strcmp(conn_opt->keyword, "dbname") == 0)
292  {
293  if (dbname)
294  *dbname = pg_strdup(conn_opt->val);
295  continue;
296  }
297  appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
298  }
299  }
300 
301  ret = pg_strdup(buf->data);
302 
304  PQconninfoFree(conn_opts);
305 
306  return ret;
307 }
308 
309 /*
310  * Build a subscriber connection string. Only a few parameters are supported
311  * since it starts a server with restricted access.
312  */
313 static char *
315 {
317  char *ret;
318 
319  appendConnStrItem(buf, "port", opt->sub_port);
320 #if !defined(WIN32)
321  appendConnStrItem(buf, "host", opt->socket_dir);
322 #endif
323  if (opt->sub_username != NULL)
324  appendConnStrItem(buf, "user", opt->sub_username);
325  appendConnStrItem(buf, "fallback_application_name", progname);
326 
327  ret = pg_strdup(buf->data);
328 
330 
331  return ret;
332 }
333 
334 /*
335  * Verify if a PostgreSQL binary (progname) is available in the same directory as
336  * pg_createsubscriber and it has the same version. It returns the absolute
337  * path of the progname.
338  */
339 static char *
340 get_exec_path(const char *argv0, const char *progname)
341 {
342  char *versionstr;
343  char *exec_path;
344  int ret;
345 
346  versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
348  ret = find_other_exec(argv0, progname, versionstr, exec_path);
349 
350  if (ret < 0)
351  {
352  char full_path[MAXPGPATH];
353 
354  if (find_my_exec(argv0, full_path) < 0)
355  strlcpy(full_path, progname, sizeof(full_path));
356 
357  if (ret == -1)
358  pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
359  progname, "pg_createsubscriber", full_path);
360  else
361  pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
362  progname, full_path, "pg_createsubscriber");
363  }
364 
365  pg_log_debug("%s path is: %s", progname, exec_path);
366 
367  return exec_path;
368 }
369 
370 /*
371  * Is it a cluster directory? These are preliminary checks. It is far from
372  * making an accurate check. If it is not a clone from the publisher, it will
373  * eventually fail in a future step.
374  */
375 static void
377 {
378  struct stat statbuf;
379  char versionfile[MAXPGPATH];
380 
381  pg_log_info("checking if directory \"%s\" is a cluster data directory",
382  datadir);
383 
384  if (stat(datadir, &statbuf) != 0)
385  {
386  if (errno == ENOENT)
387  pg_fatal("data directory \"%s\" does not exist", datadir);
388  else
389  pg_fatal("could not access directory \"%s\": %m", datadir);
390  }
391 
392  snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
393  if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
394  {
395  pg_fatal("directory \"%s\" is not a database cluster directory",
396  datadir);
397  }
398 }
399 
400 /*
401  * Append database name into a base connection string.
402  *
403  * dbname is the only parameter that changes so it is not included in the base
404  * connection string. This function concatenates dbname to build a "real"
405  * connection string.
406  */
407 static char *
408 concat_conninfo_dbname(const char *conninfo, const char *dbname)
409 {
411  char *ret;
412 
413  Assert(conninfo != NULL);
414 
415  appendPQExpBufferStr(buf, conninfo);
416  appendConnStrItem(buf, "dbname", dbname);
417 
418  ret = pg_strdup(buf->data);
420 
421  return ret;
422 }
423 
424 /*
425  * Store publication and subscription information.
426  *
427  * If publication, replication slot and subscription names were specified,
428  * store it here. Otherwise, a generated name will be assigned to the object in
429  * setup_publisher().
430  */
431 static struct LogicalRepInfo *
433  const char *pub_base_conninfo,
434  const char *sub_base_conninfo)
435 {
436  struct LogicalRepInfo *dbinfo;
437  SimpleStringListCell *pubcell = NULL;
438  SimpleStringListCell *subcell = NULL;
439  SimpleStringListCell *replslotcell = NULL;
440  int i = 0;
441 
443 
444  if (num_pubs > 0)
445  pubcell = opt->pub_names.head;
446  if (num_subs > 0)
447  subcell = opt->sub_names.head;
448  if (num_replslots > 0)
449  replslotcell = opt->replslot_names.head;
450 
451  for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
452  {
453  char *conninfo;
454 
455  /* Fill publisher attributes */
456  conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
457  dbinfo[i].pubconninfo = conninfo;
458  dbinfo[i].dbname = cell->val;
459  if (num_pubs > 0)
460  dbinfo[i].pubname = pubcell->val;
461  else
462  dbinfo[i].pubname = NULL;
463  if (num_replslots > 0)
464  dbinfo[i].replslotname = replslotcell->val;
465  else
466  dbinfo[i].replslotname = NULL;
467  dbinfo[i].made_replslot = false;
468  dbinfo[i].made_publication = false;
469  /* Fill subscriber attributes */
470  conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
471  dbinfo[i].subconninfo = conninfo;
472  if (num_subs > 0)
473  dbinfo[i].subname = subcell->val;
474  else
475  dbinfo[i].subname = NULL;
476  /* Other fields will be filled later */
477 
478  pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
479  dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
480  dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
481  dbinfo[i].pubconninfo);
482  pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
483  dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
484  dbinfo[i].subconninfo);
485 
486  if (num_pubs > 0)
487  pubcell = pubcell->next;
488  if (num_subs > 0)
489  subcell = subcell->next;
490  if (num_replslots > 0)
491  replslotcell = replslotcell->next;
492 
493  i++;
494  }
495 
496  return dbinfo;
497 }
498 
499 /*
500  * Open a new connection. If exit_on_error is true, it has an undesired
501  * condition and it should exit immediately.
502  */
503 static PGconn *
504 connect_database(const char *conninfo, bool exit_on_error)
505 {
506  PGconn *conn;
507  PGresult *res;
508 
509  conn = PQconnectdb(conninfo);
510  if (PQstatus(conn) != CONNECTION_OK)
511  {
512  pg_log_error("connection to database failed: %s",
514  PQfinish(conn);
515 
516  if (exit_on_error)
517  exit(1);
518  return NULL;
519  }
520 
521  /* Secure search_path */
524  {
525  pg_log_error("could not clear \"search_path\": %s",
527  PQclear(res);
528  PQfinish(conn);
529 
530  if (exit_on_error)
531  exit(1);
532  return NULL;
533  }
534  PQclear(res);
535 
536  return conn;
537 }
538 
539 /*
540  * Close the connection. If exit_on_error is true, it has an undesired
541  * condition and it should exit immediately.
542  */
543 static void
544 disconnect_database(PGconn *conn, bool exit_on_error)
545 {
546  Assert(conn != NULL);
547 
548  PQfinish(conn);
549 
550  if (exit_on_error)
551  exit(1);
552 }
553 
554 /*
555  * Obtain the system identifier using the provided connection. It will be used
556  * to compare if a data directory is a clone of another one.
557  */
558 static uint64
559 get_primary_sysid(const char *conninfo)
560 {
561  PGconn *conn;
562  PGresult *res;
563  uint64 sysid;
564 
565  pg_log_info("getting system identifier from publisher");
566 
567  conn = connect_database(conninfo, true);
568 
569  res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
571  {
572  pg_log_error("could not get system identifier: %s",
574  disconnect_database(conn, true);
575  }
576  if (PQntuples(res) != 1)
577  {
578  pg_log_error("could not get system identifier: got %d rows, expected %d row",
579  PQntuples(res), 1);
580  disconnect_database(conn, true);
581  }
582 
583  sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
584 
585  pg_log_info("system identifier is %llu on publisher",
586  (unsigned long long) sysid);
587 
588  PQclear(res);
589  disconnect_database(conn, false);
590 
591  return sysid;
592 }
593 
594 /*
595  * Obtain the system identifier from control file. It will be used to compare
596  * if a data directory is a clone of another one. This routine is used locally
597  * and avoids a connection.
598  */
599 static uint64
601 {
602  ControlFileData *cf;
603  bool crc_ok;
604  uint64 sysid;
605 
606  pg_log_info("getting system identifier from subscriber");
607 
608  cf = get_controlfile(datadir, &crc_ok);
609  if (!crc_ok)
610  pg_fatal("control file appears to be corrupt");
611 
612  sysid = cf->system_identifier;
613 
614  pg_log_info("system identifier is %llu on subscriber",
615  (unsigned long long) sysid);
616 
617  pg_free(cf);
618 
619  return sysid;
620 }
621 
622 /*
623  * Modify the system identifier. Since a standby server preserves the system
624  * identifier, it makes sense to change it to avoid situations in which WAL
625  * files from one of the systems might be used in the other one.
626  */
627 static void
629 {
630  ControlFileData *cf;
631  bool crc_ok;
632  struct timeval tv;
633 
634  char *cmd_str;
635 
636  pg_log_info("modifying system identifier of subscriber");
637 
638  cf = get_controlfile(subscriber_dir, &crc_ok);
639  if (!crc_ok)
640  pg_fatal("control file appears to be corrupt");
641 
642  /*
643  * Select a new system identifier.
644  *
645  * XXX this code was extracted from BootStrapXLOG().
646  */
647  gettimeofday(&tv, NULL);
648  cf->system_identifier = ((uint64) tv.tv_sec) << 32;
649  cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
650  cf->system_identifier |= getpid() & 0xFFF;
651 
652  if (!dry_run)
654 
655  pg_log_info("system identifier is %llu on subscriber",
656  (unsigned long long) cf->system_identifier);
657 
658  pg_log_info("running pg_resetwal on the subscriber");
659 
660  cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
662 
663  pg_log_debug("pg_resetwal command is: %s", cmd_str);
664 
665  if (!dry_run)
666  {
667  int rc = system(cmd_str);
668 
669  if (rc == 0)
670  pg_log_info("subscriber successfully changed the system identifier");
671  else
672  pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
673  }
674 
675  pg_free(cf);
676 }
677 
678 /*
679  * Generate an object name using a prefix, database oid and a random integer.
680  * It is used in case the user does not specify an object name (publication,
681  * subscription, replication slot).
682  */
683 static char *
685 {
686  PGresult *res;
687  Oid oid;
688  uint32 rand;
689  char *objname;
690 
691  res = PQexec(conn,
692  "SELECT oid FROM pg_catalog.pg_database "
693  "WHERE datname = pg_catalog.current_database()");
695  {
696  pg_log_error("could not obtain database OID: %s",
698  disconnect_database(conn, true);
699  }
700 
701  if (PQntuples(res) != 1)
702  {
703  pg_log_error("could not obtain database OID: got %d rows, expected %d row",
704  PQntuples(res), 1);
705  disconnect_database(conn, true);
706  }
707 
708  /* Database OID */
709  oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
710 
711  PQclear(res);
712 
713  /* Random unsigned integer */
714  rand = pg_prng_uint32(&prng_state);
715 
716  /*
717  * Build the object name. The name must not exceed NAMEDATALEN - 1. This
718  * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
719  * '\0').
720  */
721  objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
722 
723  return objname;
724 }
725 
726 /*
727  * Create the publications and replication slots in preparation for logical
728  * replication. Returns the LSN from latest replication slot. It will be the
729  * replication start point that is used to adjust the subscriptions (see
730  * set_replication_progress).
731  */
732 static char *
734 {
735  char *lsn = NULL;
736 
737  pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
738 
739  for (int i = 0; i < num_dbs; i++)
740  {
741  PGconn *conn;
742  char *genname = NULL;
743 
744  conn = connect_database(dbinfo[i].pubconninfo, true);
745 
746  /*
747  * If an object name was not specified as command-line options, assign
748  * a generated object name. The replication slot has a different rule.
749  * The subscription name is assigned to the replication slot name if
750  * no replication slot is specified. It follows the same rule as
751  * CREATE SUBSCRIPTION.
752  */
753  if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
754  genname = generate_object_name(conn);
755  if (num_pubs == 0)
756  dbinfo[i].pubname = pg_strdup(genname);
757  if (num_subs == 0)
758  dbinfo[i].subname = pg_strdup(genname);
759  if (num_replslots == 0)
761 
762  /*
763  * Create publication on publisher. This step should be executed
764  * *before* promoting the subscriber to avoid any transactions between
765  * consistent LSN and the new publication rows (such transactions
766  * wouldn't see the new publication rows resulting in an error).
767  */
769 
770  /* Create replication slot on publisher */
771  if (lsn)
772  pg_free(lsn);
774  if (lsn != NULL || dry_run)
775  pg_log_info("create replication slot \"%s\" on publisher",
776  dbinfo[i].replslotname);
777  else
778  exit(1);
779 
780  /*
781  * Since we are using the LSN returned by the last replication slot as
782  * recovery_target_lsn, this LSN is ahead of the current WAL position
783  * and the recovery waits until the publisher writes a WAL record to
784  * reach the target and ends the recovery. On idle systems, this wait
785  * time is unpredictable and could lead to failure in promoting the
786  * subscriber. To avoid that, insert a harmless WAL record.
787  */
788  if (i == num_dbs - 1 && !dry_run)
789  {
790  PGresult *res;
791 
792  res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
794  {
795  pg_log_error("could not write an additional WAL record: %s",
797  disconnect_database(conn, true);
798  }
799  PQclear(res);
800  }
801 
802  disconnect_database(conn, false);
803  }
804 
805  return lsn;
806 }
807 
808 /*
809  * Is recovery still in progress?
810  */
811 static bool
813 {
814  PGresult *res;
815  int ret;
816 
817  res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
818 
820  {
821  pg_log_error("could not obtain recovery progress: %s",
823  disconnect_database(conn, true);
824  }
825 
826 
827  ret = strcmp("t", PQgetvalue(res, 0, 0));
828 
829  PQclear(res);
830 
831  return ret == 0;
832 }
833 
834 /*
835  * Is the primary server ready for logical replication?
836  *
837  * XXX Does it not allow a synchronous replica?
838  */
839 static void
841 {
842  PGconn *conn;
843  PGresult *res;
844  bool failed = false;
845 
846  char *wal_level;
847  int max_repslots;
848  int cur_repslots;
849  int max_walsenders;
850  int cur_walsenders;
851  int max_prepared_transactions;
852 
853  pg_log_info("checking settings on publisher");
854 
855  conn = connect_database(dbinfo[0].pubconninfo, true);
856 
857  /*
858  * If the primary server is in recovery (i.e. cascading replication),
859  * objects (publication) cannot be created because it is read only.
860  */
862  {
863  pg_log_error("primary server cannot be in recovery");
864  disconnect_database(conn, true);
865  }
866 
867  /*------------------------------------------------------------------------
868  * Logical replication requires a few parameters to be set on publisher.
869  * Since these parameters are not a requirement for physical replication,
870  * we should check it to make sure it won't fail.
871  *
872  * - wal_level = logical
873  * - max_replication_slots >= current + number of dbs to be converted
874  * - max_wal_senders >= current + number of dbs to be converted
875  * -----------------------------------------------------------------------
876  */
877  res = PQexec(conn,
878  "SELECT pg_catalog.current_setting('wal_level'),"
879  " pg_catalog.current_setting('max_replication_slots'),"
880  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
881  " pg_catalog.current_setting('max_wal_senders'),"
882  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
883  " pg_catalog.current_setting('max_prepared_transactions')");
884 
886  {
887  pg_log_error("could not obtain publisher settings: %s",
889  disconnect_database(conn, true);
890  }
891 
892  wal_level = pg_strdup(PQgetvalue(res, 0, 0));
893  max_repslots = atoi(PQgetvalue(res, 0, 1));
894  cur_repslots = atoi(PQgetvalue(res, 0, 2));
895  max_walsenders = atoi(PQgetvalue(res, 0, 3));
896  cur_walsenders = atoi(PQgetvalue(res, 0, 4));
897  max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
898 
899  PQclear(res);
900 
901  pg_log_debug("publisher: wal_level: %s", wal_level);
902  pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
903  pg_log_debug("publisher: current replication slots: %d", cur_repslots);
904  pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
905  pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
906  pg_log_debug("publisher: max_prepared_transactions: %d",
907  max_prepared_transactions);
908 
909  disconnect_database(conn, false);
910 
911  if (strcmp(wal_level, "logical") != 0)
912  {
913  pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
914  failed = true;
915  }
916 
917  if (max_repslots - cur_repslots < num_dbs)
918  {
919  pg_log_error("publisher requires %d replication slots, but only %d remain",
920  num_dbs, max_repslots - cur_repslots);
921  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
922  "max_replication_slots", cur_repslots + num_dbs);
923  failed = true;
924  }
925 
926  if (max_walsenders - cur_walsenders < num_dbs)
927  {
928  pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
929  num_dbs, max_walsenders - cur_walsenders);
930  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
931  "max_wal_senders", cur_walsenders + num_dbs);
932  failed = true;
933  }
934 
935  if (max_prepared_transactions != 0)
936  {
937  pg_log_warning("two_phase option will not be enabled for replication slots");
938  pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
939  "Prepared transactions will be replicated at COMMIT PREPARED.");
940  }
941 
943 
944  if (failed)
945  exit(1);
946 }
947 
948 /*
949  * Is the standby server ready for logical replication?
950  *
951  * XXX Does it not allow a time-delayed replica?
952  *
953  * XXX In a cascaded replication scenario (P -> S -> C), if the target server
954  * is S, it cannot detect there is a replica (server C) because server S starts
955  * accepting only local connections and server C cannot connect to it. Hence,
956  * there is not a reliable way to provide a suitable error saying the server C
957  * will be broken at the end of this process (due to pg_resetwal).
958  */
959 static void
961 {
962  PGconn *conn;
963  PGresult *res;
964  bool failed = false;
965 
966  int max_lrworkers;
967  int max_repslots;
968  int max_wprocs;
969 
970  pg_log_info("checking settings on subscriber");
971 
972  conn = connect_database(dbinfo[0].subconninfo, true);
973 
974  /* The target server must be a standby */
976  {
977  pg_log_error("target server must be a standby");
978  disconnect_database(conn, true);
979  }
980 
981  /*------------------------------------------------------------------------
982  * Logical replication requires a few parameters to be set on subscriber.
983  * Since these parameters are not a requirement for physical replication,
984  * we should check it to make sure it won't fail.
985  *
986  * - max_replication_slots >= number of dbs to be converted
987  * - max_logical_replication_workers >= number of dbs to be converted
988  * - max_worker_processes >= 1 + number of dbs to be converted
989  *------------------------------------------------------------------------
990  */
991  res = PQexec(conn,
992  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
993  "'max_logical_replication_workers', "
994  "'max_replication_slots', "
995  "'max_worker_processes', "
996  "'primary_slot_name') "
997  "ORDER BY name");
998 
1000  {
1001  pg_log_error("could not obtain subscriber settings: %s",
1003  disconnect_database(conn, true);
1004  }
1005 
1006  max_lrworkers = atoi(PQgetvalue(res, 0, 0));
1007  max_repslots = atoi(PQgetvalue(res, 1, 0));
1008  max_wprocs = atoi(PQgetvalue(res, 2, 0));
1009  if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1011 
1012  pg_log_debug("subscriber: max_logical_replication_workers: %d",
1013  max_lrworkers);
1014  pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
1015  pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1016  if (primary_slot_name)
1017  pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1018 
1019  PQclear(res);
1020 
1021  disconnect_database(conn, false);
1022 
1023  if (max_repslots < num_dbs)
1024  {
1025  pg_log_error("subscriber requires %d replication slots, but only %d remain",
1026  num_dbs, max_repslots);
1027  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1028  "max_replication_slots", num_dbs);
1029  failed = true;
1030  }
1031 
1032  if (max_lrworkers < num_dbs)
1033  {
1034  pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1035  num_dbs, max_lrworkers);
1036  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1037  "max_logical_replication_workers", num_dbs);
1038  failed = true;
1039  }
1040 
1041  if (max_wprocs < num_dbs + 1)
1042  {
1043  pg_log_error("subscriber requires %d worker processes, but only %d remain",
1044  num_dbs + 1, max_wprocs);
1045  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1046  "max_worker_processes", num_dbs + 1);
1047  failed = true;
1048  }
1049 
1050  if (failed)
1051  exit(1);
1052 }
1053 
1054 /*
1055  * Drop a specified subscription. This is to avoid duplicate subscriptions on
1056  * the primary (publisher node) and the newly created subscriber. We
1057  * shouldn't drop the associated slot as that would be used by the publisher
1058  * node.
1059  */
1060 static void
1062 {
1063  PQExpBuffer query = createPQExpBuffer();
1064  PGresult *res;
1065 
1066  Assert(conn != NULL);
1067 
1068  /*
1069  * Construct a query string. These commands are allowed to be executed
1070  * within a transaction.
1071  */
1072  appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1073  subname);
1074  appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1075  subname);
1076  appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1077 
1078  pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1079  subname, dbname);
1080 
1081  if (!dry_run)
1082  {
1083  res = PQexec(conn, query->data);
1084 
1086  {
1087  pg_log_error("could not drop subscription \"%s\": %s",
1089  disconnect_database(conn, true);
1090  }
1091 
1092  PQclear(res);
1093  }
1094 
1095  destroyPQExpBuffer(query);
1096 }
1097 
1098 /*
1099  * Retrieve and drop the pre-existing subscriptions.
1100  */
1101 static void
1103  const struct LogicalRepInfo *dbinfo)
1104 {
1105  PQExpBuffer query = createPQExpBuffer();
1106  char *dbname;
1107  PGresult *res;
1108 
1109  Assert(conn != NULL);
1110 
1112 
1113  appendPQExpBuffer(query,
1114  "SELECT s.subname FROM pg_catalog.pg_subscription s "
1115  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1116  "WHERE d.datname = %s",
1117  dbname);
1118  res = PQexec(conn, query->data);
1119 
1121  {
1122  pg_log_error("could not obtain pre-existing subscriptions: %s",
1124  disconnect_database(conn, true);
1125  }
1126 
1127  for (int i = 0; i < PQntuples(res); i++)
1129  dbinfo->dbname);
1130 
1131  PQclear(res);
1132  destroyPQExpBuffer(query);
1133 }
1134 
1135 /*
1136  * Create the subscriptions, adjust the initial location for logical
1137  * replication and enable the subscriptions. That's the last step for logical
1138  * replication setup.
1139  */
1140 static void
1141 setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
1142 {
1143  for (int i = 0; i < num_dbs; i++)
1144  {
1145  PGconn *conn;
1146 
1147  /* Connect to subscriber. */
1148  conn = connect_database(dbinfo[i].subconninfo, true);
1149 
1150  /*
1151  * We don't need the pre-existing subscriptions on the newly formed
1152  * subscriber. They can connect to other publisher nodes and either
1153  * get some unwarranted data or can lead to ERRORs in connecting to
1154  * such nodes.
1155  */
1157 
1158  /*
1159  * Since the publication was created before the consistent LSN, it is
1160  * available on the subscriber when the physical replica is promoted.
1161  * Remove publications from the subscriber because it has no use.
1162  */
1164 
1166 
1167  /* Set the replication progress to the correct LSN */
1168  set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1169 
1170  /* Enable subscription */
1172 
1173  disconnect_database(conn, false);
1174  }
1175 }
1176 
1177 /*
1178  * Write the required recovery parameters.
1179  */
1180 static void
1181 setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
1182 {
1183  PGconn *conn;
1185 
1186  /*
1187  * Despite of the recovery parameters will be written to the subscriber,
1188  * use a publisher connection. The primary_conninfo is generated using the
1189  * connection settings.
1190  */
1191  conn = connect_database(dbinfo[0].pubconninfo, true);
1192 
1193  /*
1194  * Write recovery parameters.
1195  *
1196  * The subscriber is not running yet. In dry run mode, the recovery
1197  * parameters *won't* be written. An invalid LSN is used for printing
1198  * purposes. Additional recovery parameters are added here. It avoids
1199  * unexpected behavior such as end of recovery as soon as a consistent
1200  * state is reached (recovery_target) and failure due to multiple recovery
1201  * targets (name, time, xid, LSN).
1202  */
1204  appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1206  "recovery_target_timeline = 'latest'\n");
1208  "recovery_target_inclusive = true\n");
1210  "recovery_target_action = promote\n");
1211  appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1212  appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1213  appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1214 
1215  if (dry_run)
1216  {
1217  appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1219  "recovery_target_lsn = '%X/%X'\n",
1221  }
1222  else
1223  {
1224  appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1225  lsn);
1227  }
1228  disconnect_database(conn, false);
1229 
1230  pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1231 }
1232 
1233 /*
1234  * Drop physical replication slot on primary if the standby was using it. After
1235  * the transformation, it has no use.
1236  *
1237  * XXX we might not fail here. Instead, we provide a warning so the user
1238  * eventually drops this replication slot later.
1239  */
1240 static void
1242 {
1243  PGconn *conn;
1244 
1245  /* Replication slot does not exist, do nothing */
1246  if (!primary_slot_name)
1247  return;
1248 
1249  conn = connect_database(dbinfo[0].pubconninfo, false);
1250  if (conn != NULL)
1251  {
1252  drop_replication_slot(conn, &dbinfo[0], slotname);
1253  disconnect_database(conn, false);
1254  }
1255  else
1256  {
1257  pg_log_warning("could not drop replication slot \"%s\" on primary",
1258  slotname);
1259  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1260  }
1261 }
1262 
1263 /*
1264  * Drop failover replication slots on subscriber. After the transformation,
1265  * they have no use.
1266  *
1267  * XXX We do not fail here. Instead, we provide a warning so the user can drop
1268  * them later.
1269  */
1270 static void
1272 {
1273  PGconn *conn;
1274  PGresult *res;
1275 
1276  conn = connect_database(dbinfo[0].subconninfo, false);
1277  if (conn != NULL)
1278  {
1279  /* Get failover replication slot names */
1280  res = PQexec(conn,
1281  "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1282 
1284  {
1285  /* Remove failover replication slots from subscriber */
1286  for (int i = 0; i < PQntuples(res); i++)
1288  }
1289  else
1290  {
1291  pg_log_warning("could not obtain failover replication slot information: %s",
1293  pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1294  }
1295 
1296  PQclear(res);
1297  disconnect_database(conn, false);
1298  }
1299  else
1300  {
1301  pg_log_warning("could not drop failover replication slot");
1302  pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1303  }
1304 }
1305 
1306 /*
1307  * Create a logical replication slot and returns a LSN.
1308  *
1309  * CreateReplicationSlot() is not used because it does not provide the one-row
1310  * result set that contains the LSN.
1311  */
1312 static char *
1314 {
1316  PGresult *res = NULL;
1317  const char *slot_name = dbinfo->replslotname;
1318  char *slot_name_esc;
1319  char *lsn = NULL;
1320 
1321  Assert(conn != NULL);
1322 
1323  pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1324  slot_name, dbinfo->dbname);
1325 
1326  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1327 
1329  "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1330  slot_name_esc);
1331 
1332  pg_free(slot_name_esc);
1333 
1334  pg_log_debug("command is: %s", str->data);
1335 
1336  if (!dry_run)
1337  {
1338  res = PQexec(conn, str->data);
1340  {
1341  pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1342  slot_name, dbinfo->dbname,
1344  PQclear(res);
1346  return NULL;
1347  }
1348 
1349  lsn = pg_strdup(PQgetvalue(res, 0, 0));
1350  PQclear(res);
1351  }
1352 
1353  /* For cleanup purposes */
1354  dbinfo->made_replslot = true;
1355 
1357 
1358  return lsn;
1359 }
1360 
1361 static void
1363  const char *slot_name)
1364 {
1366  char *slot_name_esc;
1367  PGresult *res;
1368 
1369  Assert(conn != NULL);
1370 
1371  pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1372  slot_name, dbinfo->dbname);
1373 
1374  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1375 
1376  appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1377 
1378  pg_free(slot_name_esc);
1379 
1380  pg_log_debug("command is: %s", str->data);
1381 
1382  if (!dry_run)
1383  {
1384  res = PQexec(conn, str->data);
1386  {
1387  pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1388  slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1389  dbinfo->made_replslot = false; /* don't try again. */
1390  }
1391 
1392  PQclear(res);
1393  }
1394 
1396 }
1397 
1398 /*
1399  * Reports a suitable message if pg_ctl fails.
1400  */
1401 static void
1402 pg_ctl_status(const char *pg_ctl_cmd, int rc)
1403 {
1404  if (rc != 0)
1405  {
1406  if (WIFEXITED(rc))
1407  {
1408  pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1409  }
1410  else if (WIFSIGNALED(rc))
1411  {
1412 #if defined(WIN32)
1413  pg_log_error("pg_ctl was terminated by exception 0x%X",
1414  WTERMSIG(rc));
1415  pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1416 #else
1417  pg_log_error("pg_ctl was terminated by signal %d: %s",
1418  WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1419 #endif
1420  }
1421  else
1422  {
1423  pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1424  }
1425 
1426  pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1427  exit(1);
1428  }
1429 }
1430 
1431 static void
1432 start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
1433  bool restrict_logical_worker)
1434 {
1435  PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1436  int rc;
1437 
1438  appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1439  appendShellString(pg_ctl_cmd, subscriber_dir);
1440  appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1441  if (restricted_access)
1442  {
1443  appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1444 #if !defined(WIN32)
1445 
1446  /*
1447  * An empty listen_addresses list means the server does not listen on
1448  * any IP interfaces; only Unix-domain sockets can be used to connect
1449  * to the server. Prevent external connections to minimize the chance
1450  * of failure.
1451  */
1452  appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1453  if (opt->socket_dir)
1454  appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1455  opt->socket_dir);
1456  appendPQExpBufferChar(pg_ctl_cmd, '"');
1457 #endif
1458  }
1459  if (opt->config_file != NULL)
1460  appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1461  opt->config_file);
1462 
1463  /* Suppress to start logical replication if requested */
1464  if (restrict_logical_worker)
1465  appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1466 
1467  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1468  rc = system(pg_ctl_cmd->data);
1469  pg_ctl_status(pg_ctl_cmd->data, rc);
1470  standby_running = true;
1471  destroyPQExpBuffer(pg_ctl_cmd);
1472  pg_log_info("server was started");
1473 }
1474 
1475 static void
1477 {
1478  char *pg_ctl_cmd;
1479  int rc;
1480 
1481  pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1482  datadir);
1483  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1484  rc = system(pg_ctl_cmd);
1485  pg_ctl_status(pg_ctl_cmd, rc);
1486  standby_running = false;
1487  pg_log_info("server was stopped");
1488 }
1489 
1490 /*
1491  * Returns after the server finishes the recovery process.
1492  *
1493  * If recovery_timeout option is set, terminate abnormally without finishing
1494  * the recovery process. By default, it waits forever.
1495  *
1496  * XXX Is the recovery process still in progress? When recovery process has a
1497  * better progress reporting mechanism, it should be added here.
1498  */
1499 static void
1500 wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
1501 {
1502  PGconn *conn;
1503  int status = POSTMASTER_STILL_STARTING;
1504  int timer = 0;
1505 
1506  pg_log_info("waiting for the target server to reach the consistent state");
1507 
1508  conn = connect_database(conninfo, true);
1509 
1510  for (;;)
1511  {
1512  bool in_recovery = server_is_in_recovery(conn);
1513 
1514  /*
1515  * Does the recovery process finish? In dry run mode, there is no
1516  * recovery mode. Bail out as the recovery process has ended.
1517  */
1518  if (!in_recovery || dry_run)
1519  {
1520  status = POSTMASTER_READY;
1521  recovery_ended = true;
1522  break;
1523  }
1524 
1525  /* Bail out after recovery_timeout seconds if this option is set */
1526  if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1527  {
1529  pg_log_error("recovery timed out");
1530  disconnect_database(conn, true);
1531  }
1532 
1533  /* Keep waiting */
1535 
1536  timer += WAIT_INTERVAL;
1537  }
1538 
1539  disconnect_database(conn, false);
1540 
1541  if (status == POSTMASTER_STILL_STARTING)
1542  pg_fatal("server did not end recovery");
1543 
1544  pg_log_info("target server reached the consistent state");
1545  pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1546 }
1547 
1548 /*
1549  * Create a publication that includes all tables in the database.
1550  */
1551 static void
1553 {
1555  PGresult *res;
1556  char *ipubname_esc;
1557  char *spubname_esc;
1558 
1559  Assert(conn != NULL);
1560 
1561  ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1562  spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1563 
1564  /* Check if the publication already exists */
1566  "SELECT 1 FROM pg_catalog.pg_publication "
1567  "WHERE pubname = %s",
1568  spubname_esc);
1569  res = PQexec(conn, str->data);
1571  {
1572  pg_log_error("could not obtain publication information: %s",
1574  disconnect_database(conn, true);
1575  }
1576 
1577  if (PQntuples(res) == 1)
1578  {
1579  /*
1580  * Unfortunately, if it reaches this code path, it will always fail
1581  * (unless you decide to change the existing publication name). That's
1582  * bad but it is very unlikely that the user will choose a name with
1583  * pg_createsubscriber_ prefix followed by the exact database oid and
1584  * a random number.
1585  */
1586  pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1587  pg_log_error_hint("Consider renaming this publication before continuing.");
1588  disconnect_database(conn, true);
1589  }
1590 
1591  PQclear(res);
1593 
1594  pg_log_info("creating publication \"%s\" in database \"%s\"",
1596 
1597  appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1598  ipubname_esc);
1599 
1600  pg_log_debug("command is: %s", str->data);
1601 
1602  if (!dry_run)
1603  {
1604  res = PQexec(conn, str->data);
1606  {
1607  pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1609  disconnect_database(conn, true);
1610  }
1611  PQclear(res);
1612  }
1613 
1614  /* For cleanup purposes */
1615  dbinfo->made_publication = true;
1616 
1617  pg_free(ipubname_esc);
1618  pg_free(spubname_esc);
1620 }
1621 
1622 /*
1623  * Remove publication if it couldn't finish all steps.
1624  */
1625 static void
1627 {
1629  PGresult *res;
1630  char *pubname_esc;
1631 
1632  Assert(conn != NULL);
1633 
1634  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1635 
1636  pg_log_info("dropping publication \"%s\" in database \"%s\"",
1638 
1639  appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1640 
1641  pg_free(pubname_esc);
1642 
1643  pg_log_debug("command is: %s", str->data);
1644 
1645  if (!dry_run)
1646  {
1647  res = PQexec(conn, str->data);
1649  {
1650  pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1652  dbinfo->made_publication = false; /* don't try again. */
1653 
1654  /*
1655  * Don't disconnect and exit here. This routine is used by primary
1656  * (cleanup publication / replication slot due to an error) and
1657  * subscriber (remove the replicated publications). In both cases,
1658  * it can continue and provide instructions for the user to remove
1659  * it later if cleanup fails.
1660  */
1661  }
1662  PQclear(res);
1663  }
1664 
1666 }
1667 
1668 /*
1669  * Create a subscription with some predefined options.
1670  *
1671  * A replication slot was already created in a previous step. Let's use it. It
1672  * is not required to copy data. The subscription will be created but it will
1673  * not be enabled now. That's because the replication progress must be set and
1674  * the replication origin name (one of the function arguments) contains the
1675  * subscription OID in its name. Once the subscription is created,
1676  * set_replication_progress() can obtain the chosen origin name and set up its
1677  * initial location.
1678  */
1679 static void
1681 {
1683  PGresult *res;
1684  char *pubname_esc;
1685  char *subname_esc;
1686  char *pubconninfo_esc;
1687  char *replslotname_esc;
1688 
1689  Assert(conn != NULL);
1690 
1691  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1692  subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1693  pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1694  replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1695 
1696  pg_log_info("creating subscription \"%s\" in database \"%s\"",
1698 
1700  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1701  "WITH (create_slot = false, enabled = false, "
1702  "slot_name = %s, copy_data = false)",
1703  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1704 
1705  pg_free(pubname_esc);
1706  pg_free(subname_esc);
1707  pg_free(pubconninfo_esc);
1708  pg_free(replslotname_esc);
1709 
1710  pg_log_debug("command is: %s", str->data);
1711 
1712  if (!dry_run)
1713  {
1714  res = PQexec(conn, str->data);
1716  {
1717  pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1719  disconnect_database(conn, true);
1720  }
1721  PQclear(res);
1722  }
1723 
1725 }
1726 
1727 /*
1728  * Sets the replication progress to the consistent LSN.
1729  *
1730  * The subscriber caught up to the consistent LSN provided by the last
1731  * replication slot that was created. The goal is to set up the initial
1732  * location for the logical replication that is the exact LSN that the
1733  * subscriber was promoted. Once the subscription is enabled it will start
1734  * streaming from that location onwards. In dry run mode, the subscription OID
1735  * and LSN are set to invalid values for printing purposes.
1736  */
1737 static void
1738 set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
1739 {
1741  PGresult *res;
1742  Oid suboid;
1743  char *subname;
1744  char *dbname;
1745  char *originname;
1746  char *lsnstr;
1747 
1748  Assert(conn != NULL);
1749 
1752 
1754  "SELECT s.oid FROM pg_catalog.pg_subscription s "
1755  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1756  "WHERE s.subname = %s AND d.datname = %s",
1757  subname, dbname);
1758 
1759  res = PQexec(conn, str->data);
1761  {
1762  pg_log_error("could not obtain subscription OID: %s",
1764  disconnect_database(conn, true);
1765  }
1766 
1767  if (PQntuples(res) != 1 && !dry_run)
1768  {
1769  pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1770  PQntuples(res), 1);
1771  disconnect_database(conn, true);
1772  }
1773 
1774  if (dry_run)
1775  {
1776  suboid = InvalidOid;
1777  lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1778  }
1779  else
1780  {
1781  suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1782  lsnstr = psprintf("%s", lsn);
1783  }
1784 
1785  PQclear(res);
1786 
1787  /*
1788  * The origin name is defined as pg_%u. %u is the subscription OID. See
1789  * ApplyWorkerMain().
1790  */
1791  originname = psprintf("pg_%u", suboid);
1792 
1793  pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1794  originname, lsnstr, dbinfo->dbname);
1795 
1798  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1799  originname, lsnstr);
1800 
1801  pg_log_debug("command is: %s", str->data);
1802 
1803  if (!dry_run)
1804  {
1805  res = PQexec(conn, str->data);
1807  {
1808  pg_log_error("could not set replication progress for subscription \"%s\": %s",
1810  disconnect_database(conn, true);
1811  }
1812  PQclear(res);
1813  }
1814 
1815  pg_free(subname);
1816  pg_free(dbname);
1817  pg_free(originname);
1818  pg_free(lsnstr);
1820 }
1821 
1822 /*
1823  * Enables the subscription.
1824  *
1825  * The subscription was created in a previous step but it was disabled. After
1826  * adjusting the initial logical replication location, enable the subscription.
1827  */
1828 static void
1830 {
1832  PGresult *res;
1833  char *subname;
1834 
1835  Assert(conn != NULL);
1836 
1838 
1839  pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1841 
1842  appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1843 
1844  pg_log_debug("command is: %s", str->data);
1845 
1846  if (!dry_run)
1847  {
1848  res = PQexec(conn, str->data);
1850  {
1851  pg_log_error("could not enable subscription \"%s\": %s",
1853  disconnect_database(conn, true);
1854  }
1855 
1856  PQclear(res);
1857  }
1858 
1859  pg_free(subname);
1861 }
1862 
1863 int
1864 main(int argc, char **argv)
1865 {
1866  static struct option long_options[] =
1867  {
1868  {"database", required_argument, NULL, 'd'},
1869  {"pgdata", required_argument, NULL, 'D'},
1870  {"dry-run", no_argument, NULL, 'n'},
1871  {"subscriber-port", required_argument, NULL, 'p'},
1872  {"publisher-server", required_argument, NULL, 'P'},
1873  {"socketdir", required_argument, NULL, 's'},
1874  {"recovery-timeout", required_argument, NULL, 't'},
1875  {"subscriber-username", required_argument, NULL, 'U'},
1876  {"verbose", no_argument, NULL, 'v'},
1877  {"version", no_argument, NULL, 'V'},
1878  {"help", no_argument, NULL, '?'},
1879  {"config-file", required_argument, NULL, 1},
1880  {"publication", required_argument, NULL, 2},
1881  {"replication-slot", required_argument, NULL, 3},
1882  {"subscription", required_argument, NULL, 4},
1883  {NULL, 0, NULL, 0}
1884  };
1885 
1886  struct CreateSubscriberOptions opt = {0};
1887 
1888  int c;
1889  int option_index;
1890 
1891  char *pub_base_conninfo;
1892  char *sub_base_conninfo;
1893  char *dbname_conninfo = NULL;
1894 
1895  uint64 pub_sysid;
1896  uint64 sub_sysid;
1897  struct stat statbuf;
1898 
1899  char *consistent_lsn;
1900 
1901  char pidfile[MAXPGPATH];
1902 
1903  pg_logging_init(argv[0]);
1905  progname = get_progname(argv[0]);
1906  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
1907 
1908  if (argc > 1)
1909  {
1910  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1911  {
1912  usage();
1913  exit(0);
1914  }
1915  else if (strcmp(argv[1], "-V") == 0
1916  || strcmp(argv[1], "--version") == 0)
1917  {
1918  puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1919  exit(0);
1920  }
1921  }
1922 
1923  /* Default settings */
1924  subscriber_dir = NULL;
1925  opt.config_file = NULL;
1926  opt.pub_conninfo_str = NULL;
1927  opt.socket_dir = NULL;
1928  opt.sub_port = DEFAULT_SUB_PORT;
1929  opt.sub_username = NULL;
1931  {
1932  0
1933  };
1934  opt.recovery_timeout = 0;
1935 
1936  /*
1937  * Don't allow it to be run as root. It uses pg_ctl which does not allow
1938  * it either.
1939  */
1940 #ifndef WIN32
1941  if (geteuid() == 0)
1942  {
1943  pg_log_error("cannot be executed by \"root\"");
1944  pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1945  progname);
1946  exit(1);
1947  }
1948 #endif
1949 
1951 
1952  while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1953  long_options, &option_index)) != -1)
1954  {
1955  switch (c)
1956  {
1957  case 'd':
1959  {
1961  num_dbs++;
1962  }
1963  else
1964  {
1965  pg_log_error("database \"%s\" specified more than once", optarg);
1966  exit(1);
1967  }
1968  break;
1969  case 'D':
1972  break;
1973  case 'n':
1974  dry_run = true;
1975  break;
1976  case 'p':
1977  opt.sub_port = pg_strdup(optarg);
1978  break;
1979  case 'P':
1981  break;
1982  case 's':
1983  opt.socket_dir = pg_strdup(optarg);
1985  break;
1986  case 't':
1987  opt.recovery_timeout = atoi(optarg);
1988  break;
1989  case 'U':
1990  opt.sub_username = pg_strdup(optarg);
1991  break;
1992  case 'v':
1994  break;
1995  case 1:
1996  opt.config_file = pg_strdup(optarg);
1997  break;
1998  case 2:
2000  {
2002  num_pubs++;
2003  }
2004  else
2005  {
2006  pg_log_error("publication \"%s\" specified more than once", optarg);
2007  exit(1);
2008  }
2009  break;
2010  case 3:
2012  {
2014  num_replslots++;
2015  }
2016  else
2017  {
2018  pg_log_error("replication slot \"%s\" specified more than once", optarg);
2019  exit(1);
2020  }
2021  break;
2022  case 4:
2024  {
2026  num_subs++;
2027  }
2028  else
2029  {
2030  pg_log_error("subscription \"%s\" specified more than once", optarg);
2031  exit(1);
2032  }
2033  break;
2034  default:
2035  /* getopt_long already emitted a complaint */
2036  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2037  exit(1);
2038  }
2039  }
2040 
2041  /* Any non-option arguments? */
2042  if (optind < argc)
2043  {
2044  pg_log_error("too many command-line arguments (first is \"%s\")",
2045  argv[optind]);
2046  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2047  exit(1);
2048  }
2049 
2050  /* Required arguments */
2051  if (subscriber_dir == NULL)
2052  {
2053  pg_log_error("no subscriber data directory specified");
2054  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2055  exit(1);
2056  }
2057 
2058  /* If socket directory is not provided, use the current directory */
2059  if (opt.socket_dir == NULL)
2060  {
2061  char cwd[MAXPGPATH];
2062 
2063  if (!getcwd(cwd, MAXPGPATH))
2064  pg_fatal("could not determine current directory");
2065  opt.socket_dir = pg_strdup(cwd);
2067  }
2068 
2069  /*
2070  * Parse connection string. Build a base connection string that might be
2071  * reused by multiple databases.
2072  */
2073  if (opt.pub_conninfo_str == NULL)
2074  {
2075  /*
2076  * TODO use primary_conninfo (if available) from subscriber and
2077  * extract publisher connection string. Assume that there are
2078  * identical entries for physical and logical replication. If there is
2079  * not, we would fail anyway.
2080  */
2081  pg_log_error("no publisher connection string specified");
2082  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2083  exit(1);
2084  }
2085  pg_log_info("validating publisher connection string");
2086  pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2087  &dbname_conninfo);
2088  if (pub_base_conninfo == NULL)
2089  exit(1);
2090 
2091  pg_log_info("validating subscriber connection string");
2092  sub_base_conninfo = get_sub_conninfo(&opt);
2093 
2094  if (opt.database_names.head == NULL)
2095  {
2096  pg_log_info("no database was specified");
2097 
2098  /*
2099  * If --database option is not provided, try to obtain the dbname from
2100  * the publisher conninfo. If dbname parameter is not available, error
2101  * out.
2102  */
2103  if (dbname_conninfo)
2104  {
2105  simple_string_list_append(&opt.database_names, dbname_conninfo);
2106  num_dbs++;
2107 
2108  pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2109  dbname_conninfo);
2110  }
2111  else
2112  {
2113  pg_log_error("no database name specified");
2114  pg_log_error_hint("Try \"%s --help\" for more information.",
2115  progname);
2116  exit(1);
2117  }
2118  }
2119 
2120  /* Number of object names must match number of databases */
2121  if (num_pubs > 0 && num_pubs != num_dbs)
2122  {
2123  pg_log_error("wrong number of publication names specified");
2124  pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2125  num_pubs, num_dbs);
2126  exit(1);
2127  }
2128  if (num_subs > 0 && num_subs != num_dbs)
2129  {
2130  pg_log_error("wrong number of subscription names specified");
2131  pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2132  num_subs, num_dbs);
2133  exit(1);
2134  }
2135  if (num_replslots > 0 && num_replslots != num_dbs)
2136  {
2137  pg_log_error("wrong number of replication slot names specified");
2138  pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2140  exit(1);
2141  }
2142 
2143  /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2144  pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2145  pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2146 
2147  /* Rudimentary check for a data directory */
2149 
2150  /*
2151  * Store database information for publisher and subscriber. It should be
2152  * called before atexit() because its return is used in the
2153  * cleanup_objects_atexit().
2154  */
2155  dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2156 
2157  /* Register a function to clean up objects in case of failure */
2158  atexit(cleanup_objects_atexit);
2159 
2160  /*
2161  * Check if the subscriber data directory has the same system identifier
2162  * than the publisher data directory.
2163  */
2164  pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2165  sub_sysid = get_standby_sysid(subscriber_dir);
2166  if (pub_sysid != sub_sysid)
2167  pg_fatal("subscriber data directory is not a copy of the source database cluster");
2168 
2169  /* Subscriber PID file */
2170  snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2171 
2172  /*
2173  * The standby server must not be running. If the server is started under
2174  * service manager and pg_createsubscriber stops it, the service manager
2175  * might react to this action and start the server again. Therefore,
2176  * refuse to proceed if the server is running to avoid possible failures.
2177  */
2178  if (stat(pidfile, &statbuf) == 0)
2179  {
2180  pg_log_error("standby server is running");
2181  pg_log_error_hint("Stop the standby server and try again.");
2182  exit(1);
2183  }
2184 
2185  /*
2186  * Start a short-lived standby server with temporary parameters (provided
2187  * by command-line options). The goal is to avoid connections during the
2188  * transformation steps.
2189  */
2190  pg_log_info("starting the standby server with command-line options");
2191  start_standby_server(&opt, true, false);
2192 
2193  /* Check if the standby server is ready for logical replication */
2195 
2196  /* Check if the primary server is ready for logical replication */
2198 
2199  /*
2200  * Stop the target server. The recovery process requires that the server
2201  * reaches a consistent state before targeting the recovery stop point.
2202  * Make sure a consistent state is reached (stop the target server
2203  * guarantees it) *before* creating the replication slots in
2204  * setup_publisher().
2205  */
2206  pg_log_info("stopping the subscriber");
2208 
2209  /* Create the required objects for each database on publisher */
2210  consistent_lsn = setup_publisher(dbinfo);
2211 
2212  /* Write the required recovery parameters */
2213  setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2214 
2215  /*
2216  * Start subscriber so the recovery parameters will take effect. Wait
2217  * until accepting connections. We don't want to start logical replication
2218  * during setup.
2219  */
2220  pg_log_info("starting the subscriber");
2221  start_standby_server(&opt, true, true);
2222 
2223  /* Waiting the subscriber to be promoted */
2224  wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2225 
2226  /*
2227  * Create the subscription for each database on subscriber. It does not
2228  * enable it immediately because it needs to adjust the replication start
2229  * point to the LSN reported by setup_publisher(). It also cleans up
2230  * publications created by this tool and replication to the standby.
2231  */
2232  setup_subscriber(dbinfo, consistent_lsn);
2233 
2234  /* Remove primary_slot_name if it exists on primary */
2236 
2237  /* Remove failover replication slots if they exist on subscriber */
2239 
2240  /* Stop the subscriber */
2241  pg_log_info("stopping the subscriber");
2243 
2244  /* Change system identifier from subscriber */
2246 
2247  success = true;
2248 
2249  pg_log_info("Done!");
2250 
2251  return 0;
2252 }
unsigned int uint32
Definition: c.h:492
#define Assert(condition)
Definition: c.h:837
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1193
#define strtou64(str, endptr, base)
Definition: c.h:1277
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:160
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:310
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define _(x)
Definition: elog.c:90
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5739
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7023
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7200
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7137
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4879
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:745
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4310
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4304
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:56
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:24
#define required_argument
Definition: getopt_long.h:25
const char * str
long val
Definition: informix.c:689
int i
Definition: isn.c:72
@ CONNECTION_OK
Definition: libpq-fe.h:81
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
exit(1)
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:176
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_info(...)
Definition: logging.h:124
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_info_hint(...)
Definition: logging.h:130
@ PG_LOG_WARNING
Definition: logging.h:38
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_log_debug(...)
Definition: logging.h:133
#define pg_fatal(...)
static PQExpBuffer recoveryconfcontents
#define MAXPGPATH
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
#define WAIT_INTERVAL
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static struct LogicalRepInfo * dbinfo
static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
int main(int argc, char **argv)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static void usage()
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
@ POSTMASTER_READY
@ POSTMASTER_STILL_STARTING
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
#define USEC_PER_SEC
static char * get_base_conninfo(const char *conninfo, char **dbname)
static bool success
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static bool dry_run
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
static char * argv0
Definition: pg_ctl.c:93
static char * exec_path
Definition: pg_ctl.c:88
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89
char * datadir
NameData subname
static char * buf
Definition: pg_test_fsync.c:72
#define pg_log_warning(...)
Definition: pgfnames.c:24
void canonicalize_path(char *path)
Definition: path.c:265
const char * get_progname(const char *argv0)
Definition: path.c:575
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
#define snprintf
Definition: port.h:238
#define DEVNULL
Definition: port.h:160
#define printf(...)
Definition: port.h:244
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define InvalidOid
Definition: postgres_ext.h:36
unsigned int Oid
Definition: postgres_ext.h:31
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
char * c
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:124
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:27
void get_restricted_token(void)
void pg_usleep(long microsec)
Definition: signal.c:53
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
struct SimpleStringList SimpleStringList
char * dbname
Definition: streamutil.c:50
PGconn * conn
Definition: streamutil.c:53
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:429
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:545
uint64 system_identifier
Definition: pg_control.h:110
SimpleStringList database_names
SimpleStringList replslot_names
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
#define stat
Definition: win32_port.h:284
#define WIFEXITED(w)
Definition: win32_port.h:152
#define WIFSIGNALED(w)
Definition: win32_port.h:153
#define WTERMSIG(w)
Definition: win32_port.h:155
#define WEXITSTATUS(w)
Definition: win32_port.h:154
int gettimeofday(struct timeval *tp, void *tzp)
int wal_level
Definition: xlog.c:131
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28