PostgreSQL Source Code  git master
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/time.h>
#include <sys/wait.h>
#include "catalog/pg_authid_d.h"
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define USEC_PER_SEC   1000000
 
#define WAIT_INTERVAL   1 /* 1 second */
 
#define NUM_CONN_ATTEMPTS   10
 

Enumerations

enum  WaitPMResult {
  POSTMASTER_READY , POSTMASTER_STILL_STARTING , POSTMASTER_READY , POSTMASTER_STILL_STARTING ,
  POSTMASTER_FAILED
}
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage ()
 
static char * get_base_conninfo (const char *conninfo, char **dbname)
 
static char * get_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static char * get_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static char * concat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static char * generate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static char * setup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static char * create_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
int main (int argc, char **argv)
 

Variables

static const char * progname
 
static char * primary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfodbinfo
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static char * pg_ctl_path = NULL
 
static char * pg_resetwal_path = NULL
 
static char * subscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 31 of file pg_createsubscriber.c.

◆ NUM_CONN_ATTEMPTS

#define NUM_CONN_ATTEMPTS   10

◆ USEC_PER_SEC

#define USEC_PER_SEC   1000000

Definition at line 105 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 106 of file pg_createsubscriber.c.

Enumeration Type Documentation

◆ WaitPMResult

Enumerator
POSTMASTER_READY 
POSTMASTER_STILL_STARTING 
POSTMASTER_READY 
POSTMASTER_STILL_STARTING 
POSTMASTER_FAILED 

Definition at line 132 of file pg_createsubscriber.c.

133 {
136 };
@ POSTMASTER_READY
@ POSTMASTER_STILL_STARTING

Function Documentation

◆ check_data_directory()

static void check_data_directory ( const char *  datadir)
static

Definition at line 364 of file pg_createsubscriber.c.

365 {
366  struct stat statbuf;
367  char versionfile[MAXPGPATH];
368 
369  pg_log_info("checking if directory \"%s\" is a cluster data directory",
370  datadir);
371 
372  if (stat(datadir, &statbuf) != 0)
373  {
374  if (errno == ENOENT)
375  pg_fatal("data directory \"%s\" does not exist", datadir);
376  else
377  pg_fatal("could not access directory \"%s\": %m", datadir);
378  }
379 
380  snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
381  if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
382  {
383  pg_fatal("directory \"%s\" is not a database cluster directory",
384  datadir);
385  }
386 }
#define pg_log_info(...)
Definition: logging.h:124
#define pg_fatal(...)
#define MAXPGPATH
char * datadir
#define snprintf
Definition: port.h:238
#define stat
Definition: win32_port.h:284

References datadir, MAXPGPATH, pg_fatal, pg_log_info, snprintf, and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 806 of file pg_createsubscriber.c.

807 {
808  PGconn *conn;
809  PGresult *res;
810  bool failed = false;
811 
812  char *wal_level;
813  int max_repslots;
814  int cur_repslots;
815  int max_walsenders;
816  int cur_walsenders;
817 
818  pg_log_info("checking settings on publisher");
819 
820  conn = connect_database(dbinfo[0].pubconninfo, true);
821 
822  /*
823  * If the primary server is in recovery (i.e. cascading replication),
824  * objects (publication) cannot be created because it is read only.
825  */
827  {
828  pg_log_error("primary server cannot be in recovery");
829  disconnect_database(conn, true);
830  }
831 
832  /*------------------------------------------------------------------------
833  * Logical replication requires a few parameters to be set on publisher.
834  * Since these parameters are not a requirement for physical replication,
835  * we should check it to make sure it won't fail.
836  *
837  * - wal_level = logical
838  * - max_replication_slots >= current + number of dbs to be converted
839  * - max_wal_senders >= current + number of dbs to be converted
840  * -----------------------------------------------------------------------
841  */
842  res = PQexec(conn,
843  "WITH wl AS "
844  "(SELECT setting AS wallevel FROM pg_catalog.pg_settings "
845  "WHERE name = 'wal_level'), "
846  "total_mrs AS "
847  "(SELECT setting AS tmrs FROM pg_catalog.pg_settings "
848  "WHERE name = 'max_replication_slots'), "
849  "cur_mrs AS "
850  "(SELECT count(*) AS cmrs "
851  "FROM pg_catalog.pg_replication_slots), "
852  "total_mws AS "
853  "(SELECT setting AS tmws FROM pg_catalog.pg_settings "
854  "WHERE name = 'max_wal_senders'), "
855  "cur_mws AS "
856  "(SELECT count(*) AS cmws FROM pg_catalog.pg_stat_activity "
857  "WHERE backend_type = 'walsender') "
858  "SELECT wallevel, tmrs, cmrs, tmws, cmws "
859  "FROM wl, total_mrs, cur_mrs, total_mws, cur_mws");
860 
862  {
863  pg_log_error("could not obtain publisher settings: %s",
865  disconnect_database(conn, true);
866  }
867 
868  wal_level = pg_strdup(PQgetvalue(res, 0, 0));
869  max_repslots = atoi(PQgetvalue(res, 0, 1));
870  cur_repslots = atoi(PQgetvalue(res, 0, 2));
871  max_walsenders = atoi(PQgetvalue(res, 0, 3));
872  cur_walsenders = atoi(PQgetvalue(res, 0, 4));
873 
874  PQclear(res);
875 
876  pg_log_debug("publisher: wal_level: %s", wal_level);
877  pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
878  pg_log_debug("publisher: current replication slots: %d", cur_repslots);
879  pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
880  pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
881 
882  /*
883  * If standby sets primary_slot_name, check if this replication slot is in
884  * use on primary for WAL retention purposes. This replication slot has no
885  * use after the transformation, hence, it will be removed at the end of
886  * this process.
887  */
888  if (primary_slot_name)
889  {
891  char *psn_esc = PQescapeLiteral(conn, primary_slot_name, strlen(primary_slot_name));
892 
894  "SELECT 1 FROM pg_catalog.pg_replication_slots "
895  "WHERE active AND slot_name = %s",
896  psn_esc);
897 
898  pg_free(psn_esc);
899 
900  pg_log_debug("command is: %s", str->data);
901 
902  res = PQexec(conn, str->data);
904  {
905  pg_log_error("could not obtain replication slot information: %s",
907  disconnect_database(conn, true);
908  }
909 
910  if (PQntuples(res) != 1)
911  {
912  pg_log_error("could not obtain replication slot information: got %d rows, expected %d row",
913  PQntuples(res), 1);
914  disconnect_database(conn, true);
915  }
916  else
917  pg_log_info("primary has replication slot \"%s\"",
919 
920  PQclear(res);
921  }
922 
923  disconnect_database(conn, false);
924 
925  if (strcmp(wal_level, "logical") != 0)
926  {
927  pg_log_error("publisher requires wal_level >= \"logical\"");
928  failed = true;
929  }
930 
931  if (max_repslots - cur_repslots < num_dbs)
932  {
933  pg_log_error("publisher requires %d replication slots, but only %d remain",
934  num_dbs, max_repslots - cur_repslots);
935  pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
936  cur_repslots + num_dbs);
937  failed = true;
938  }
939 
940  if (max_walsenders - cur_walsenders < num_dbs)
941  {
942  pg_log_error("publisher requires %d wal sender processes, but only %d remain",
943  num_dbs, max_walsenders - cur_walsenders);
944  pg_log_error_hint("Consider increasing max_wal_senders to at least %d.",
945  cur_walsenders + num_dbs);
946  failed = true;
947  }
948 
950 
951  if (failed)
952  exit(1);
953 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4304
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
const char * str
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:103
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_debug(...)
Definition: logging.h:133
static struct LogicalRepInfo * dbinfo
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static char * primary_slot_name
static void disconnect_database(PGconn *conn, bool exit_on_error)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
PGconn * conn
Definition: streamutil.c:55
int wal_level
Definition: xlog.c:131

References appendPQExpBuffer(), conn, connect_database(), createPQExpBuffer(), dbinfo, disconnect_database(), exit(), num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), primary_slot_name, res, server_is_in_recovery(), str, and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 967 of file pg_createsubscriber.c.

968 {
969  PGconn *conn;
970  PGresult *res;
971  bool failed = false;
972 
973  int max_lrworkers;
974  int max_repslots;
975  int max_wprocs;
976 
977  pg_log_info("checking settings on subscriber");
978 
979  conn = connect_database(dbinfo[0].subconninfo, true);
980 
981  /* The target server must be a standby */
983  {
984  pg_log_error("target server must be a standby");
985  disconnect_database(conn, true);
986  }
987 
988  /*------------------------------------------------------------------------
989  * Logical replication requires a few parameters to be set on subscriber.
990  * Since these parameters are not a requirement for physical replication,
991  * we should check it to make sure it won't fail.
992  *
993  * - max_replication_slots >= number of dbs to be converted
994  * - max_logical_replication_workers >= number of dbs to be converted
995  * - max_worker_processes >= 1 + number of dbs to be converted
996  *------------------------------------------------------------------------
997  */
998  res = PQexec(conn,
999  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1000  "'max_logical_replication_workers', "
1001  "'max_replication_slots', "
1002  "'max_worker_processes', "
1003  "'primary_slot_name') "
1004  "ORDER BY name");
1005 
1007  {
1008  pg_log_error("could not obtain subscriber settings: %s",
1010  disconnect_database(conn, true);
1011  }
1012 
1013  max_lrworkers = atoi(PQgetvalue(res, 0, 0));
1014  max_repslots = atoi(PQgetvalue(res, 1, 0));
1015  max_wprocs = atoi(PQgetvalue(res, 2, 0));
1016  if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1018 
1019  pg_log_debug("subscriber: max_logical_replication_workers: %d",
1020  max_lrworkers);
1021  pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
1022  pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1023  if (primary_slot_name)
1024  pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1025 
1026  PQclear(res);
1027 
1028  disconnect_database(conn, false);
1029 
1030  if (max_repslots < num_dbs)
1031  {
1032  pg_log_error("subscriber requires %d replication slots, but only %d remain",
1033  num_dbs, max_repslots);
1034  pg_log_error_hint("Consider increasing max_replication_slots to at least %d.",
1035  num_dbs);
1036  failed = true;
1037  }
1038 
1039  if (max_lrworkers < num_dbs)
1040  {
1041  pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1042  num_dbs, max_lrworkers);
1043  pg_log_error_hint("Consider increasing max_logical_replication_workers to at least %d.",
1044  num_dbs);
1045  failed = true;
1046  }
1047 
1048  if (max_wprocs < num_dbs + 1)
1049  {
1050  pg_log_error("subscriber requires %d worker processes, but only %d remain",
1051  num_dbs + 1, max_wprocs);
1052  pg_log_error_hint("Consider increasing max_worker_processes to at least %d.",
1053  num_dbs + 1);
1054  failed = true;
1055  }
1056 
1057  if (failed)
1058  exit(1);
1059 }

References conn, connect_database(), dbinfo, disconnect_database(), exit(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), primary_slot_name, res, and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 151 of file pg_createsubscriber.c.

152 {
153  if (success)
154  return;
155 
156  /*
157  * If the server is promoted, there is no way to use the current setup
158  * again. Warn the user that a new replication setup should be done before
159  * trying again.
160  */
161  if (recovery_ended)
162  {
163  pg_log_warning("failed after the end of recovery");
164  pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
165  "You must recreate the physical replica before continuing.");
166  }
167 
168  for (int i = 0; i < num_dbs; i++)
169  {
170  if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
171  {
172  PGconn *conn;
173 
174  conn = connect_database(dbinfo[i].pubconninfo, false);
175  if (conn != NULL)
176  {
177  if (dbinfo[i].made_publication)
179  if (dbinfo[i].made_replslot)
180  drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
181  disconnect_database(conn, false);
182  }
183  else
184  {
185  /*
186  * If a connection could not be established, inform the user
187  * that some objects were left on primary and should be
188  * removed before trying again.
189  */
190  if (dbinfo[i].made_publication)
191  {
192  pg_log_warning("publication \"%s\" in database \"%s\" on primary might be left behind",
193  dbinfo[i].pubname, dbinfo[i].dbname);
194  pg_log_warning_hint("Consider dropping this publication before trying again.");
195  }
196  if (dbinfo[i].made_replslot)
197  {
198  pg_log_warning("replication slot \"%s\" in database \"%s\" on primary might be left behind",
199  dbinfo[i].replslotname, dbinfo[i].dbname);
200  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
201  }
202  }
203  }
204  }
205 
206  if (standby_running)
208 }
int i
Definition: isn.c:73
#define pg_log_warning_hint(...)
Definition: logging.h:121
static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
static bool success
static bool recovery_ended
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
#define pg_log_warning(...)
Definition: pgfnames.c:24
char * dbname
Definition: streamutil.c:52

References conn, connect_database(), dbinfo, dbname, disconnect_database(), drop_publication(), drop_replication_slot(), i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, LogicalRepInfo::replslotname, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char *  conninfo,
const char *  dbname 
)
static

Definition at line 396 of file pg_createsubscriber.c.

397 {
399  char *ret;
400 
401  Assert(conninfo != NULL);
402 
403  appendPQExpBufferStr(buf, conninfo);
404  appendPQExpBuffer(buf, " dbname=%s", dbname);
405 
406  ret = pg_strdup(buf->data);
408 
409  return ret;
410 }
#define Assert(condition)
Definition: c.h:858
static char * buf
Definition: pg_test_fsync.c:73
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367

References appendPQExpBuffer(), appendPQExpBufferStr(), Assert, buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), and pg_strdup().

Referenced by store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char *  conninfo,
bool  exit_on_error 
)
static

Definition at line 492 of file pg_createsubscriber.c.

493 {
494  PGconn *conn;
495  PGresult *res;
496 
497  conn = PQconnectdb(conninfo);
498  if (PQstatus(conn) != CONNECTION_OK)
499  {
500  pg_log_error("connection to database failed: %s",
502  PQfinish(conn);
503 
504  if (exit_on_error)
505  exit(1);
506  return NULL;
507  }
508 
509  /* Secure search_path */
512  {
513  pg_log_error("could not clear search_path: %s",
515  PQclear(res);
516  PQfinish(conn);
517 
518  if (exit_on_error)
519  exit(1);
520  return NULL;
521  }
522  PQclear(res);
523 
524  return conn;
525 }
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7147
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7094
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4868
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:744
@ CONNECTION_OK
Definition: libpq-fe.h:61

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, exit(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage(), PQresultStatus(), PQstatus(), and res.

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_primary_replication_slot(), get_primary_sysid(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1188 of file pg_createsubscriber.c.

1189 {
1191  PGresult *res = NULL;
1192  const char *slot_name = dbinfo->replslotname;
1193  char *slot_name_esc;
1194  char *lsn = NULL;
1195 
1196  Assert(conn != NULL);
1197 
1198  pg_log_info("creating the replication slot \"%s\" on database \"%s\"",
1199  slot_name, dbinfo->dbname);
1200 
1201  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1202 
1204  "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1205  slot_name_esc);
1206 
1207  pg_free(slot_name_esc);
1208 
1209  pg_log_debug("command is: %s", str->data);
1210 
1211  if (!dry_run)
1212  {
1213  res = PQexec(conn, str->data);
1215  {
1216  pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s",
1217  slot_name, dbinfo->dbname,
1219  PQclear(res);
1221  return NULL;
1222  }
1223 
1224  lsn = pg_strdup(PQgetvalue(res, 0, 0));
1225  PQclear(res);
1226  }
1227 
1228  /* For cleanup purposes */
1229  dbinfo->made_replslot = true;
1230 
1232 
1233  return lsn;
1234 }
static bool dry_run

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_free(), pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::replslotname, res, and str.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1443 of file pg_createsubscriber.c.

1444 {
1446  PGresult *res;
1447  char *ipubname_esc;
1448  char *spubname_esc;
1449 
1450  Assert(conn != NULL);
1451 
1452  ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1453  spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1454 
1455  /* Check if the publication already exists */
1457  "SELECT 1 FROM pg_catalog.pg_publication "
1458  "WHERE pubname = %s",
1459  spubname_esc);
1460  res = PQexec(conn, str->data);
1462  {
1463  pg_log_error("could not obtain publication information: %s",
1465  disconnect_database(conn, true);
1466  }
1467 
1468  if (PQntuples(res) == 1)
1469  {
1470  /*
1471  * Unfortunately, if it reaches this code path, it will always fail
1472  * (unless you decide to change the existing publication name). That's
1473  * bad but it is very unlikely that the user will choose a name with
1474  * pg_createsubscriber_ prefix followed by the exact database oid and
1475  * a random number.
1476  */
1477  pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1478  pg_log_error_hint("Consider renaming this publication before continuing.");
1479  disconnect_database(conn, true);
1480  }
1481 
1482  PQclear(res);
1484 
1485  pg_log_info("creating publication \"%s\" on database \"%s\"",
1487 
1488  appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1489  ipubname_esc);
1490 
1491  pg_log_debug("command is: %s", str->data);
1492 
1493  if (!dry_run)
1494  {
1495  res = PQexec(conn, str->data);
1497  {
1498  pg_log_error("could not create publication \"%s\" on database \"%s\": %s",
1500  disconnect_database(conn, true);
1501  }
1502  PQclear(res);
1503  }
1504 
1505  /* For cleanup purposes */
1506  dbinfo->made_publication = true;
1507 
1508  pg_free(ipubname_esc);
1509  pg_free(spubname_esc);
1511 }
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4310
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, LogicalRepInfo::made_publication, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubname, res, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1571 of file pg_createsubscriber.c.

1572 {
1574  PGresult *res;
1575  char *pubname_esc;
1576  char *subname_esc;
1577  char *pubconninfo_esc;
1578  char *replslotname_esc;
1579 
1580  Assert(conn != NULL);
1581 
1582  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1583  subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1584  pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1585  replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1586 
1587  pg_log_info("creating subscription \"%s\" on database \"%s\"",
1589 
1591  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1592  "WITH (create_slot = false, enabled = false, "
1593  "slot_name = %s, copy_data = false)",
1594  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1595 
1596  pg_free(pubname_esc);
1597  pg_free(subname_esc);
1598  pg_free(pubconninfo_esc);
1599  pg_free(replslotname_esc);
1600 
1601  pg_log_debug("command is: %s", str->data);
1602 
1603  if (!dry_run)
1604  {
1605  res = PQexec(conn, str->data);
1607  {
1608  pg_log_error("could not create subscription \"%s\" on database \"%s\": %s",
1610  disconnect_database(conn, true);
1611  }
1612  PQclear(res);
1613  }
1614 
1616 }

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, res, str, and LogicalRepInfo::subname.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char *  slotname 
)
static

Definition at line 1159 of file pg_createsubscriber.c.

1160 {
1161  PGconn *conn;
1162 
1163  /* Replication slot does not exist, do nothing */
1164  if (!primary_slot_name)
1165  return;
1166 
1167  conn = connect_database(dbinfo[0].pubconninfo, false);
1168  if (conn != NULL)
1169  {
1170  drop_replication_slot(conn, &dbinfo[0], slotname);
1171  disconnect_database(conn, false);
1172  }
1173  else
1174  {
1175  pg_log_warning("could not drop replication slot \"%s\" on primary",
1176  slotname);
1177  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1178  }
1179 }

References conn, connect_database(), dbinfo, disconnect_database(), drop_replication_slot(), pg_log_warning, pg_log_warning_hint, and primary_slot_name.

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1517 of file pg_createsubscriber.c.

1518 {
1520  PGresult *res;
1521  char *pubname_esc;
1522 
1523  Assert(conn != NULL);
1524 
1525  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1526 
1527  pg_log_info("dropping publication \"%s\" on database \"%s\"",
1529 
1530  appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1531 
1532  pg_free(pubname_esc);
1533 
1534  pg_log_debug("command is: %s", str->data);
1535 
1536  if (!dry_run)
1537  {
1538  res = PQexec(conn, str->data);
1540  {
1541  pg_log_error("could not drop publication \"%s\" on database \"%s\": %s",
1543  dbinfo->made_publication = false; /* don't try again. */
1544 
1545  /*
1546  * Don't disconnect and exit here. This routine is used by primary
1547  * (cleanup publication / replication slot due to an error) and
1548  * subscriber (remove the replicated publications). In both cases,
1549  * it can continue and provide instructions for the user to remove
1550  * it later if cleanup fails.
1551  */
1552  }
1553  PQclear(res);
1554  }
1555 
1557 }

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_publication, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubname, res, and str.

Referenced by cleanup_objects_atexit(), and setup_subscriber().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char *  slot_name 
)
static

Definition at line 1237 of file pg_createsubscriber.c.

1239 {
1241  char *slot_name_esc;
1242  PGresult *res;
1243 
1244  Assert(conn != NULL);
1245 
1246  pg_log_info("dropping the replication slot \"%s\" on database \"%s\"",
1247  slot_name, dbinfo->dbname);
1248 
1249  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1250 
1251  appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1252 
1253  pg_free(slot_name_esc);
1254 
1255  pg_log_debug("command is: %s", str->data);
1256 
1257  if (!dry_run)
1258  {
1259  res = PQexec(conn, str->data);
1261  {
1262  pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s",
1263  slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1264  dbinfo->made_replslot = false; /* don't try again. */
1265  }
1266 
1267  PQclear(res);
1268  }
1269 
1271 }

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQresultErrorMessage(), PQresultStatus(), res, and str.

Referenced by cleanup_objects_atexit(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1720 of file pg_createsubscriber.c.

1721 {
1723  PGresult *res;
1724  char *subname;
1725 
1726  Assert(conn != NULL);
1727 
1729 
1730  pg_log_info("enabling subscription \"%s\" on database \"%s\"",
1732 
1733  appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1734 
1735  pg_log_debug("command is: %s", str->data);
1736 
1737  if (!dry_run)
1738  {
1739  res = PQexec(conn, str->data);
1741  {
1742  pg_log_error("could not enable subscription \"%s\": %s",
1744  disconnect_database(conn, true);
1745  }
1746 
1747  PQclear(res);
1748  }
1749 
1750  pg_free(subname);
1752 }
NameData subname

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQresultErrorMessage(), PQresultStatus(), res, str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 672 of file pg_createsubscriber.c.

673 {
674  PGresult *res;
675  Oid oid;
676  uint32 rand;
677  char *objname;
678 
679  res = PQexec(conn,
680  "SELECT oid FROM pg_catalog.pg_database "
681  "WHERE datname = pg_catalog.current_database()");
683  {
684  pg_log_error("could not obtain database OID: %s",
686  disconnect_database(conn, true);
687  }
688 
689  if (PQntuples(res) != 1)
690  {
691  pg_log_error("could not obtain database OID: got %d rows, expected %d row",
692  PQntuples(res), 1);
693  disconnect_database(conn, true);
694  }
695 
696  /* Database OID */
697  oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
698 
699  PQclear(res);
700 
701  /* Random unsigned integer */
702  rand = pg_prng_uint32(&prng_state);
703 
704  /*
705  * Build the object name. The name must not exceed NAMEDATALEN - 1. This
706  * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
707  * '\0').
708  */
709  objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
710 
711  return objname;
712 }
unsigned int uint32
Definition: c.h:506
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
unsigned int Oid
Definition: postgres_ext.h:31
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46

References conn, disconnect_database(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), prng_state, psprintf(), and res.

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char *  conninfo,
char **  dbname 
)
static

Definition at line 252 of file pg_createsubscriber.c.

253 {
255  PQconninfoOption *conn_opts;
256  PQconninfoOption *conn_opt;
257  char *errmsg = NULL;
258  char *ret;
259  int i;
260 
261  conn_opts = PQconninfoParse(conninfo, &errmsg);
262  if (conn_opts == NULL)
263  {
264  pg_log_error("could not parse connection string: %s", errmsg);
265  PQfreemem(errmsg);
266  return NULL;
267  }
268 
270  i = 0;
271  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
272  {
273  if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL)
274  {
275  if (dbname)
276  *dbname = pg_strdup(conn_opt->val);
277  continue;
278  }
279 
280  if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
281  {
282  if (i > 0)
284  appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val);
285  i++;
286  }
287  }
288 
289  ret = pg_strdup(buf->data);
290 
292  PQconninfoFree(conn_opts);
293 
294  return ret;
295 }
int errmsg(const char *fmt,...)
Definition: elog.c:1072
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5728
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:6980
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378

References appendPQExpBuffer(), appendPQExpBufferChar(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg(), i, _PQconninfoOption::keyword, pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and _PQconninfoOption::val.

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char *  argv0,
const char *  progname 
)
static

Definition at line 328 of file pg_createsubscriber.c.

329 {
330  char *versionstr;
331  char *exec_path;
332  int ret;
333 
334  versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
336  ret = find_other_exec(argv0, progname, versionstr, exec_path);
337 
338  if (ret < 0)
339  {
340  char full_path[MAXPGPATH];
341 
342  if (find_my_exec(argv0, full_path) < 0)
343  strlcpy(full_path, progname, sizeof(full_path));
344 
345  if (ret == -1)
346  pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
347  progname, "pg_createsubscriber", full_path);
348  else
349  pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
350  progname, full_path, "pg_createsubscriber");
351  }
352 
353  pg_log_debug("%s path is: %s", progname, exec_path);
354 
355  return exec_path;
356 }
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:160
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:329
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static const char * progname
static char * argv0
Definition: pg_ctl.c:92
static char * exec_path
Definition: pg_ctl.c:87
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45

References argv0, exec_path, find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char *  conninfo)
static

Definition at line 547 of file pg_createsubscriber.c.

548 {
549  PGconn *conn;
550  PGresult *res;
551  uint64 sysid;
552 
553  pg_log_info("getting system identifier from publisher");
554 
555  conn = connect_database(conninfo, true);
556 
557  res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
559  {
560  pg_log_error("could not get system identifier: %s",
562  disconnect_database(conn, true);
563  }
564  if (PQntuples(res) != 1)
565  {
566  pg_log_error("could not get system identifier: got %d rows, expected %d row",
567  PQntuples(res), 1);
568  disconnect_database(conn, true);
569  }
570 
571  sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
572 
573  pg_log_info("system identifier is %llu on publisher",
574  (unsigned long long) sysid);
575 
576  PQclear(res);
577  disconnect_database(conn, false);
578 
579  return sysid;
580 }
#define strtou64(str, endptr, base)
Definition: c.h:1298

References conn, connect_database(), disconnect_database(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), res, and strtou64.

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char *  datadir)
static

Definition at line 588 of file pg_createsubscriber.c.

589 {
590  ControlFileData *cf;
591  bool crc_ok;
592  uint64 sysid;
593 
594  pg_log_info("getting system identifier from subscriber");
595 
596  cf = get_controlfile(datadir, &crc_ok);
597  if (!crc_ok)
598  pg_fatal("control file appears to be corrupt");
599 
600  sysid = cf->system_identifier;
601 
602  pg_log_info("system identifier is %llu on subscriber",
603  (unsigned long long) sysid);
604 
605  pg_free(cf);
606 
607  return sysid;
608 }
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
uint64 system_identifier
Definition: pg_control.h:109

References datadir, get_controlfile(), pg_fatal, pg_free(), pg_log_info, and ControlFileData::system_identifier.

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 302 of file pg_createsubscriber.c.

303 {
305  char *ret;
306 
307  appendPQExpBuffer(buf, "port=%s", opt->sub_port);
308 #if !defined(WIN32)
309  appendPQExpBuffer(buf, " host=%s", opt->socket_dir);
310 #endif
311  if (opt->sub_username != NULL)
312  appendPQExpBuffer(buf, " user=%s", opt->sub_username);
313  appendPQExpBuffer(buf, " fallback_application_name=%s", progname);
314 
315  ret = pg_strdup(buf->data);
316 
318 
319  return ret;
320 }

References appendPQExpBuffer(), buf, createPQExpBuffer(), destroyPQExpBuffer(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 1755 of file pg_createsubscriber.c.

1756 {
1757  static struct option long_options[] =
1758  {
1759  {"database", required_argument, NULL, 'd'},
1760  {"pgdata", required_argument, NULL, 'D'},
1761  {"dry-run", no_argument, NULL, 'n'},
1762  {"subscriber-port", required_argument, NULL, 'p'},
1763  {"publisher-server", required_argument, NULL, 'P'},
1764  {"socket-directory", required_argument, NULL, 's'},
1765  {"recovery-timeout", required_argument, NULL, 't'},
1766  {"subscriber-username", required_argument, NULL, 'U'},
1767  {"verbose", no_argument, NULL, 'v'},
1768  {"version", no_argument, NULL, 'V'},
1769  {"help", no_argument, NULL, '?'},
1770  {"config-file", required_argument, NULL, 1},
1771  {"publication", required_argument, NULL, 2},
1772  {"replication-slot", required_argument, NULL, 3},
1773  {"subscription", required_argument, NULL, 4},
1774  {NULL, 0, NULL, 0}
1775  };
1776 
1777  struct CreateSubscriberOptions opt = {0};
1778 
1779  int c;
1780  int option_index;
1781 
1782  char *pub_base_conninfo;
1783  char *sub_base_conninfo;
1784  char *dbname_conninfo = NULL;
1785 
1786  uint64 pub_sysid;
1787  uint64 sub_sysid;
1788  struct stat statbuf;
1789 
1790  char *consistent_lsn;
1791 
1792  char pidfile[MAXPGPATH];
1793 
1794  pg_logging_init(argv[0]);
1796  progname = get_progname(argv[0]);
1797  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_createsubscriber"));
1798 
1799  if (argc > 1)
1800  {
1801  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1802  {
1803  usage();
1804  exit(0);
1805  }
1806  else if (strcmp(argv[1], "-V") == 0
1807  || strcmp(argv[1], "--version") == 0)
1808  {
1809  puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1810  exit(0);
1811  }
1812  }
1813 
1814  /* Default settings */
1815  subscriber_dir = NULL;
1816  opt.config_file = NULL;
1817  opt.pub_conninfo_str = NULL;
1818  opt.socket_dir = NULL;
1819  opt.sub_port = DEFAULT_SUB_PORT;
1820  opt.sub_username = NULL;
1822  {
1823  0
1824  };
1825  opt.recovery_timeout = 0;
1826 
1827  /*
1828  * Don't allow it to be run as root. It uses pg_ctl which does not allow
1829  * it either.
1830  */
1831 #ifndef WIN32
1832  if (geteuid() == 0)
1833  {
1834  pg_log_error("cannot be executed by \"root\"");
1835  pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1836  progname);
1837  exit(1);
1838  }
1839 #endif
1840 
1842 
1843  while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1844  long_options, &option_index)) != -1)
1845  {
1846  switch (c)
1847  {
1848  case 'd':
1850  {
1852  num_dbs++;
1853  }
1854  else
1855  {
1856  pg_log_error("duplicate database \"%s\"", optarg);
1857  exit(1);
1858  }
1859  break;
1860  case 'D':
1863  break;
1864  case 'n':
1865  dry_run = true;
1866  break;
1867  case 'p':
1868  opt.sub_port = pg_strdup(optarg);
1869  break;
1870  case 'P':
1872  break;
1873  case 's':
1874  opt.socket_dir = pg_strdup(optarg);
1876  break;
1877  case 't':
1878  opt.recovery_timeout = atoi(optarg);
1879  break;
1880  case 'U':
1881  opt.sub_username = pg_strdup(optarg);
1882  break;
1883  case 'v':
1885  break;
1886  case 1:
1887  opt.config_file = pg_strdup(optarg);
1888  break;
1889  case 2:
1891  {
1893  num_pubs++;
1894  }
1895  else
1896  {
1897  pg_log_error("duplicate publication \"%s\"", optarg);
1898  exit(1);
1899  }
1900  break;
1901  case 3:
1903  {
1905  num_replslots++;
1906  }
1907  else
1908  {
1909  pg_log_error("duplicate replication slot \"%s\"", optarg);
1910  exit(1);
1911  }
1912  break;
1913  case 4:
1915  {
1917  num_subs++;
1918  }
1919  else
1920  {
1921  pg_log_error("duplicate subscription \"%s\"", optarg);
1922  exit(1);
1923  }
1924  break;
1925  default:
1926  /* getopt_long already emitted a complaint */
1927  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1928  exit(1);
1929  }
1930  }
1931 
1932  /* Any non-option arguments? */
1933  if (optind < argc)
1934  {
1935  pg_log_error("too many command-line arguments (first is \"%s\")",
1936  argv[optind]);
1937  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1938  exit(1);
1939  }
1940 
1941  /* Required arguments */
1942  if (subscriber_dir == NULL)
1943  {
1944  pg_log_error("no subscriber data directory specified");
1945  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1946  exit(1);
1947  }
1948 
1949  /* If socket directory is not provided, use the current directory */
1950  if (opt.socket_dir == NULL)
1951  {
1952  char cwd[MAXPGPATH];
1953 
1954  if (!getcwd(cwd, MAXPGPATH))
1955  pg_fatal("could not determine current directory");
1956  opt.socket_dir = pg_strdup(cwd);
1958  }
1959 
1960  /*
1961  * Parse connection string. Build a base connection string that might be
1962  * reused by multiple databases.
1963  */
1964  if (opt.pub_conninfo_str == NULL)
1965  {
1966  /*
1967  * TODO use primary_conninfo (if available) from subscriber and
1968  * extract publisher connection string. Assume that there are
1969  * identical entries for physical and logical replication. If there is
1970  * not, we would fail anyway.
1971  */
1972  pg_log_error("no publisher connection string specified");
1973  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
1974  exit(1);
1975  }
1976  pg_log_info("validating connection string on publisher");
1977  pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
1978  &dbname_conninfo);
1979  if (pub_base_conninfo == NULL)
1980  exit(1);
1981 
1982  pg_log_info("validating connection string on subscriber");
1983  sub_base_conninfo = get_sub_conninfo(&opt);
1984 
1985  if (opt.database_names.head == NULL)
1986  {
1987  pg_log_info("no database was specified");
1988 
1989  /*
1990  * If --database option is not provided, try to obtain the dbname from
1991  * the publisher conninfo. If dbname parameter is not available, error
1992  * out.
1993  */
1994  if (dbname_conninfo)
1995  {
1996  simple_string_list_append(&opt.database_names, dbname_conninfo);
1997  num_dbs++;
1998 
1999  pg_log_info("database \"%s\" was extracted from the publisher connection string",
2000  dbname_conninfo);
2001  }
2002  else
2003  {
2004  pg_log_error("no database name specified");
2005  pg_log_error_hint("Try \"%s --help\" for more information.",
2006  progname);
2007  exit(1);
2008  }
2009  }
2010 
2011  /* Number of object names must match number of databases */
2012  if (num_pubs > 0 && num_pubs != num_dbs)
2013  {
2014  pg_log_error("wrong number of publication names");
2015  pg_log_error_hint("Number of publication names (%d) must match number of database names (%d).",
2016  num_pubs, num_dbs);
2017  exit(1);
2018  }
2019  if (num_subs > 0 && num_subs != num_dbs)
2020  {
2021  pg_log_error("wrong number of subscription names");
2022  pg_log_error_hint("Number of subscription names (%d) must match number of database names (%d).",
2023  num_subs, num_dbs);
2024  exit(1);
2025  }
2026  if (num_replslots > 0 && num_replslots != num_dbs)
2027  {
2028  pg_log_error("wrong number of replication slot names");
2029  pg_log_error_hint("Number of replication slot names (%d) must match number of database names (%d).",
2031  exit(1);
2032  }
2033 
2034  /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2035  pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2036  pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2037 
2038  /* Rudimentary check for a data directory */
2040 
2041  /*
2042  * Store database information for publisher and subscriber. It should be
2043  * called before atexit() because its return is used in the
2044  * cleanup_objects_atexit().
2045  */
2046  dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2047 
2048  /* Register a function to clean up objects in case of failure */
2049  atexit(cleanup_objects_atexit);
2050 
2051  /*
2052  * Check if the subscriber data directory has the same system identifier
2053  * than the publisher data directory.
2054  */
2055  pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2056  sub_sysid = get_standby_sysid(subscriber_dir);
2057  if (pub_sysid != sub_sysid)
2058  pg_fatal("subscriber data directory is not a copy of the source database cluster");
2059 
2060  /* Subscriber PID file */
2061  snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2062 
2063  /*
2064  * The standby server must not be running. If the server is started under
2065  * service manager and pg_createsubscriber stops it, the service manager
2066  * might react to this action and start the server again. Therefore,
2067  * refuse to proceed if the server is running to avoid possible failures.
2068  */
2069  if (stat(pidfile, &statbuf) == 0)
2070  {
2071  pg_log_error("standby is up and running");
2072  pg_log_error_hint("Stop the standby and try again.");
2073  exit(1);
2074  }
2075 
2076  /*
2077  * Start a short-lived standby server with temporary parameters (provided
2078  * by command-line options). The goal is to avoid connections during the
2079  * transformation steps.
2080  */
2081  pg_log_info("starting the standby with command-line options");
2082  start_standby_server(&opt, true);
2083 
2084  /* Check if the standby server is ready for logical replication */
2086 
2087  /*
2088  * Check if the primary server is ready for logical replication. This
2089  * routine checks if a replication slot is in use on primary so it relies
2090  * on check_subscriber() to obtain the primary_slot_name. That's why it is
2091  * called after it.
2092  */
2094 
2095  /*
2096  * Stop the target server. The recovery process requires that the server
2097  * reaches a consistent state before targeting the recovery stop point.
2098  * Make sure a consistent state is reached (stop the target server
2099  * guarantees it) *before* creating the replication slots in
2100  * setup_publisher().
2101  */
2102  pg_log_info("stopping the subscriber");
2104 
2105  /*
2106  * Create the required objects for each database on publisher. This step
2107  * is here mainly because if we stop the standby we cannot verify if the
2108  * primary slot is in use. We could use an extra connection for it but it
2109  * doesn't seem worth.
2110  */
2111  consistent_lsn = setup_publisher(dbinfo);
2112 
2113  /* Write the required recovery parameters */
2114  setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2115 
2116  /*
2117  * Start subscriber so the recovery parameters will take effect. Wait
2118  * until accepting connections.
2119  */
2120  pg_log_info("starting the subscriber");
2121  start_standby_server(&opt, true);
2122 
2123  /* Waiting the subscriber to be promoted */
2124  wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2125 
2126  /*
2127  * Create the subscription for each database on subscriber. It does not
2128  * enable it immediately because it needs to adjust the replication start
2129  * point to the LSN reported by setup_publisher(). It also cleans up
2130  * publications created by this tool and replication to the standby.
2131  */
2132  setup_subscriber(dbinfo, consistent_lsn);
2133 
2134  /* Remove primary_slot_name if it exists on primary */
2136 
2137  /* Stop the subscriber */
2138  pg_log_info("stopping the subscriber");
2140 
2141  /* Change system identifier from subscriber */
2143 
2144  success = true;
2145 
2146  pg_log_info("Done!");
2147 
2148  return 0;
2149 }
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1214
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:448
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:24
#define required_argument
Definition: getopt_long.h:25
void pg_logging_increase_verbosity(void)
Definition: logging.c:182
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:173
@ PG_LOG_WARNING
Definition: logging.h:38
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static char * pg_ctl_path
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static void check_data_directory(const char *datadir)
static void usage()
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static char * pg_resetwal_path
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition: getopt.c:50
PGDLLIMPORT char * optarg
Definition: getopt.c:52
void canonicalize_path(char *path)
Definition: path.c:264
const char * get_progname(const char *argv0)
Definition: path.c:574
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
struct SimpleStringList SimpleStringList
SimpleStringList database_names
SimpleStringList replslot_names
SimpleStringListCell * head
Definition: simple_list.h:42

References canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, dbinfo, DEFAULT_SUB_PORT, drop_primary_replication_slot(), dry_run, exit(), get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), getopt_long(), SimpleStringList::head, MAXPGPATH, modify_subscriber_sysid(), no_argument, num_dbs, num_pubs, num_replslots, num_subs, optarg, optind, pg_ctl_path, pg_fatal, pg_log_error, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_resetwal_path, pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, subscriber_dir, success, usage(), and wait_for_end_recovery().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 616 of file pg_createsubscriber.c.

617 {
618  ControlFileData *cf;
619  bool crc_ok;
620  struct timeval tv;
621 
622  char *cmd_str;
623 
624  pg_log_info("modifying system identifier of subscriber");
625 
626  cf = get_controlfile(subscriber_dir, &crc_ok);
627  if (!crc_ok)
628  pg_fatal("control file appears to be corrupt");
629 
630  /*
631  * Select a new system identifier.
632  *
633  * XXX this code was extracted from BootStrapXLOG().
634  */
635  gettimeofday(&tv, NULL);
636  cf->system_identifier = ((uint64) tv.tv_sec) << 32;
637  cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
638  cf->system_identifier |= getpid() & 0xFFF;
639 
640  if (!dry_run)
642 
643  pg_log_info("system identifier is %llu on subscriber",
644  (unsigned long long) cf->system_identifier);
645 
646  pg_log_info("running pg_resetwal on the subscriber");
647 
648  cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
650 
651  pg_log_debug("pg_resetwal command is: %s", cmd_str);
652 
653  if (!dry_run)
654  {
655  int rc = system(cmd_str);
656 
657  if (rc == 0)
658  pg_log_info("subscriber successfully changed the system identifier");
659  else
660  pg_fatal("subscriber failed to change system identifier: exit code: %d", rc);
661  }
662 
663  pg_free(cf);
664 }
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define DEVNULL
Definition: port.h:160
int gettimeofday(struct timeval *tp, void *tzp)

References DEVNULL, dry_run, get_controlfile(), gettimeofday(), pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), subscriber_dir, ControlFileData::system_identifier, and update_controlfile().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char *  pg_ctl_cmd,
int  rc 
)
static

Definition at line 1277 of file pg_createsubscriber.c.

1278 {
1279  if (rc != 0)
1280  {
1281  if (WIFEXITED(rc))
1282  {
1283  pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1284  }
1285  else if (WIFSIGNALED(rc))
1286  {
1287 #if defined(WIN32)
1288  pg_log_error("pg_ctl was terminated by exception 0x%X",
1289  WTERMSIG(rc));
1290  pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1291 #else
1292  pg_log_error("pg_ctl was terminated by signal %d: %s",
1293  WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1294 #endif
1295  }
1296  else
1297  {
1298  pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1299  }
1300 
1301  pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1302  exit(1);
1303  }
1304 }
#define pg_log_error_detail(...)
Definition: logging.h:109
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
#define WIFEXITED(w)
Definition: win32_port.h:152
#define WIFSIGNALED(w)
Definition: win32_port.h:153
#define WTERMSIG(w)
Definition: win32_port.h:155
#define WEXITSTATUS(w)
Definition: win32_port.h:154

References exit(), pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 778 of file pg_createsubscriber.c.

779 {
780  PGresult *res;
781  int ret;
782 
783  res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
784 
786  {
787  pg_log_error("could not obtain recovery progress: %s",
789  disconnect_database(conn, true);
790  }
791 
792 
793  ret = strcmp("t", PQgetvalue(res, 0, 0));
794 
795  PQclear(res);
796 
797  return ret == 0;
798 }

References conn, disconnect_database(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), and res.

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char *  lsn 
)
static

Definition at line 1629 of file pg_createsubscriber.c.

1630 {
1632  PGresult *res;
1633  Oid suboid;
1634  char *subname;
1635  char *dbname;
1636  char *originname;
1637  char *lsnstr;
1638 
1639  Assert(conn != NULL);
1640 
1643 
1645  "SELECT s.oid FROM pg_catalog.pg_subscription s "
1646  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1647  "WHERE s.subname = %s AND d.datname = %s",
1648  subname, dbname);
1649 
1650  res = PQexec(conn, str->data);
1652  {
1653  pg_log_error("could not obtain subscription OID: %s",
1655  disconnect_database(conn, true);
1656  }
1657 
1658  if (PQntuples(res) != 1 && !dry_run)
1659  {
1660  pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1661  PQntuples(res), 1);
1662  disconnect_database(conn, true);
1663  }
1664 
1665  if (dry_run)
1666  {
1667  suboid = InvalidOid;
1668  lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1669  }
1670  else
1671  {
1672  suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1673  lsnstr = psprintf("%s", lsn);
1674  }
1675 
1676  PQclear(res);
1677 
1678  /*
1679  * The origin name is defined as pg_%u. %u is the subscription OID. See
1680  * ApplyWorkerMain().
1681  */
1682  originname = psprintf("pg_%u", suboid);
1683 
1684  pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
1685  originname, lsnstr, dbinfo->dbname);
1686 
1689  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1690  originname, lsnstr);
1691 
1692  pg_log_debug("command is: %s", str->data);
1693 
1694  if (!dry_run)
1695  {
1696  res = PQexec(conn, str->data);
1698  {
1699  pg_log_error("could not set replication progress for the subscription \"%s\": %s",
1701  disconnect_database(conn, true);
1702  }
1703  PQclear(res);
1704  }
1705 
1706  pg_free(subname);
1707  pg_free(dbname);
1708  pg_free(originname);
1709  pg_free(lsnstr);
1711 }
#define InvalidOid
Definition: postgres_ext.h:36
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), psprintf(), res, resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 721 of file pg_createsubscriber.c.

722 {
723  char *lsn = NULL;
724 
725  pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
726 
727  for (int i = 0; i < num_dbs; i++)
728  {
729  PGconn *conn;
730  char *genname = NULL;
731 
732  conn = connect_database(dbinfo[i].pubconninfo, true);
733 
734  /*
735  * If an object name was not specified as command-line options, assign
736  * a generated object name. The replication slot has a different rule.
737  * The subscription name is assigned to the replication slot name if
738  * no replication slot is specified. It follows the same rule as
739  * CREATE SUBSCRIPTION.
740  */
741  if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
742  genname = generate_object_name(conn);
743  if (num_pubs == 0)
744  dbinfo[i].pubname = pg_strdup(genname);
745  if (num_subs == 0)
746  dbinfo[i].subname = pg_strdup(genname);
747  if (num_replslots == 0)
749 
750  /*
751  * Create publication on publisher. This step should be executed
752  * *before* promoting the subscriber to avoid any transactions between
753  * consistent LSN and the new publication rows (such transactions
754  * wouldn't see the new publication rows resulting in an error).
755  */
757 
758  /* Create replication slot on publisher */
759  if (lsn)
760  pg_free(lsn);
762  if (lsn != NULL || dry_run)
763  pg_log_info("create replication slot \"%s\" on publisher",
764  dbinfo[i].replslotname);
765  else
766  exit(1);
767 
768  disconnect_database(conn, false);
769  }
770 
771  return lsn;
772 }
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), dbinfo, disconnect_database(), dry_run, exit(), generate_object_name(), i, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_info, pg_prng_seed(), pg_strdup(), prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char *  datadir,
const char *  lsn 
)
static

Definition at line 1099 of file pg_createsubscriber.c.

1100 {
1101  PGconn *conn;
1103 
1104  /*
1105  * Despite of the recovery parameters will be written to the subscriber,
1106  * use a publisher connection. The primary_conninfo is generated using the
1107  * connection settings.
1108  */
1109  conn = connect_database(dbinfo[0].pubconninfo, true);
1110 
1111  /*
1112  * Write recovery parameters.
1113  *
1114  * The subscriber is not running yet. In dry run mode, the recovery
1115  * parameters *won't* be written. An invalid LSN is used for printing
1116  * purposes. Additional recovery parameters are added here. It avoids
1117  * unexpected behavior such as end of recovery as soon as a consistent
1118  * state is reached (recovery_target) and failure due to multiple recovery
1119  * targets (name, time, xid, LSN).
1120  */
1122  appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1124  "recovery_target_timeline = 'latest'\n");
1126  "recovery_target_inclusive = true\n");
1128  "recovery_target_action = promote\n");
1129  appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1130  appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1131  appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1132 
1133  if (dry_run)
1134  {
1135  appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1137  "recovery_target_lsn = '%X/%X'\n",
1139  }
1140  else
1141  {
1142  appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1143  lsn);
1145  }
1146  disconnect_database(conn, false);
1147 
1148  pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1149 }
static PQExpBuffer recoveryconfcontents
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:124
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:27

References appendPQExpBuffer(), conn, connect_database(), PQExpBufferData::data, datadir, dbinfo, disconnect_database(), dry_run, GenerateRecoveryConfig(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_log_debug, recoveryconfcontents, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char *  consistent_lsn 
)
static

Definition at line 1067 of file pg_createsubscriber.c.

1068 {
1069  for (int i = 0; i < num_dbs; i++)
1070  {
1071  PGconn *conn;
1072 
1073  /* Connect to subscriber. */
1074  conn = connect_database(dbinfo[i].subconninfo, true);
1075 
1076  /*
1077  * Since the publication was created before the consistent LSN, it is
1078  * available on the subscriber when the physical replica is promoted.
1079  * Remove publications from the subscriber because it has no use.
1080  */
1082 
1084 
1085  /* Set the replication progress to the correct LSN */
1086  set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1087 
1088  /* Enable subscription */
1090 
1091  disconnect_database(conn, false);
1092  }
1093 }
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References conn, connect_database(), create_subscription(), dbinfo, disconnect_database(), drop_publication(), enable_subscription(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access 
)
static

Definition at line 1307 of file pg_createsubscriber.c.

1308 {
1309  PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1310  int rc;
1311 
1312  appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s",
1314  if (restricted_access)
1315  {
1316  appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1317 #if !defined(WIN32)
1318 
1319  /*
1320  * An empty listen_addresses list means the server does not listen on
1321  * any IP interfaces; only Unix-domain sockets can be used to connect
1322  * to the server. Prevent external connections to minimize the chance
1323  * of failure.
1324  */
1325  appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1326  if (opt->socket_dir)
1327  appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1328  opt->socket_dir);
1329  appendPQExpBufferChar(pg_ctl_cmd, '"');
1330 #endif
1331  }
1332  if (opt->config_file != NULL)
1333  appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1334  opt->config_file);
1335  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1336  rc = system(pg_ctl_cmd->data);
1337  pg_ctl_status(pg_ctl_cmd->data, rc);
1338  standby_running = true;
1339  destroyPQExpBuffer(pg_ctl_cmd);
1340  pg_log_info("server was started");
1341 }
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), CreateSubscriberOptions::config_file, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char *  datadir)
static

Definition at line 1344 of file pg_createsubscriber.c.

1345 {
1346  char *pg_ctl_cmd;
1347  int rc;
1348 
1349  pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1350  datadir);
1351  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1352  rc = system(pg_ctl_cmd);
1353  pg_ctl_status(pg_ctl_cmd, rc);
1354  standby_running = false;
1355  pg_log_info("server was stopped");
1356 }

References datadir, pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, psprintf(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char *  pub_base_conninfo,
const char *  sub_base_conninfo 
)
static

Definition at line 420 of file pg_createsubscriber.c.

423 {
424  struct LogicalRepInfo *dbinfo;
425  SimpleStringListCell *pubcell = NULL;
426  SimpleStringListCell *subcell = NULL;
427  SimpleStringListCell *replslotcell = NULL;
428  int i = 0;
429 
431 
432  if (num_pubs > 0)
433  pubcell = opt->pub_names.head;
434  if (num_subs > 0)
435  subcell = opt->sub_names.head;
436  if (num_replslots > 0)
437  replslotcell = opt->replslot_names.head;
438 
439  for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
440  {
441  char *conninfo;
442 
443  /* Fill publisher attributes */
444  conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
445  dbinfo[i].pubconninfo = conninfo;
446  dbinfo[i].dbname = cell->val;
447  if (num_pubs > 0)
448  dbinfo[i].pubname = pubcell->val;
449  else
450  dbinfo[i].pubname = NULL;
451  if (num_replslots > 0)
452  dbinfo[i].replslotname = replslotcell->val;
453  else
454  dbinfo[i].replslotname = NULL;
455  dbinfo[i].made_replslot = false;
456  dbinfo[i].made_publication = false;
457  /* Fill subscriber attributes */
458  conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
459  dbinfo[i].subconninfo = conninfo;
460  if (num_subs > 0)
461  dbinfo[i].subname = subcell->val;
462  else
463  dbinfo[i].subname = NULL;
464  /* Other fields will be filled later */
465 
466  pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
467  dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
468  dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
469  dbinfo[i].pubconninfo);
470  pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
471  dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
472  dbinfo[i].subconninfo);
473 
474  if (num_pubs > 0)
475  pubcell = pubcell->next;
476  if (num_subs > 0)
477  subcell = subcell->next;
478  if (num_replslots > 0)
479  replslotcell = replslotcell->next;
480 
481  i++;
482  }
483 
484  return dbinfo;
485 }
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:44
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37
struct SimpleStringListCell * next
Definition: simple_list.h:34

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfo, LogicalRepInfo::dbname, SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, SimpleStringListCell::next, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, and SimpleStringListCell::val.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 211 of file pg_createsubscriber.c.

212 {
213  printf(_("%s creates a new logical replica from a standby server.\n\n"),
214  progname);
215  printf(_("Usage:\n"));
216  printf(_(" %s [OPTION]...\n"), progname);
217  printf(_("\nOptions:\n"));
218  printf(_(" -d, --database=DBNAME database to create a subscription\n"));
219  printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
220  printf(_(" -n, --dry-run dry run, just show what would be done\n"));
221  printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
222  printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
223  printf(_(" -s, --socket-directory=DIR socket directory to use (default current directory)\n"));
224  printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
225  printf(_(" -U, --subscriber-username=NAME subscriber username\n"));
226  printf(_(" -v, --verbose output verbose messages\n"));
227  printf(_(" --config-file=FILENAME use specified main server configuration\n"
228  " file when running target cluster\n"));
229  printf(_(" --publication=NAME publication name\n"));
230  printf(_(" --replication-slot=NAME replication slot name\n"));
231  printf(_(" --subscription=NAME subscription name\n"));
232  printf(_(" -V, --version output version information, then exit\n"));
233  printf(_(" -?, --help show this help, then exit\n"));
234  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
235  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
236 }
#define _(x)
Definition: elog.c:90
#define printf(...)
Definition: port.h:244

References _, DEFAULT_SUB_PORT, printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char *  conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1365 of file pg_createsubscriber.c.

1366 {
1367  PGconn *conn;
1368  int status = POSTMASTER_STILL_STARTING;
1369  int timer = 0;
1370  int count = 0; /* number of consecutive connection attempts */
1371 
1372 #define NUM_CONN_ATTEMPTS 10
1373 
1374  pg_log_info("waiting for the target server to reach the consistent state");
1375 
1376  conn = connect_database(conninfo, true);
1377 
1378  for (;;)
1379  {
1380  PGresult *res;
1381  bool in_recovery = server_is_in_recovery(conn);
1382 
1383  /*
1384  * Does the recovery process finish? In dry run mode, there is no
1385  * recovery mode. Bail out as the recovery process has ended.
1386  */
1387  if (!in_recovery || dry_run)
1388  {
1389  status = POSTMASTER_READY;
1390  recovery_ended = true;
1391  break;
1392  }
1393 
1394  /*
1395  * If it is still in recovery, make sure the target server is
1396  * connected to the primary so it can receive the required WAL to
1397  * finish the recovery process. If it is disconnected try
1398  * NUM_CONN_ATTEMPTS in a row and bail out if not succeed.
1399  */
1400  res = PQexec(conn,
1401  "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
1402  if (PQntuples(res) == 0)
1403  {
1404  if (++count > NUM_CONN_ATTEMPTS)
1405  {
1407  pg_log_error("standby server disconnected from the primary");
1408  break;
1409  }
1410  }
1411  else
1412  count = 0; /* reset counter if it connects again */
1413 
1414  PQclear(res);
1415 
1416  /* Bail out after recovery_timeout seconds if this option is set */
1417  if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1418  {
1420  pg_log_error("recovery timed out");
1421  disconnect_database(conn, true);
1422  }
1423 
1424  /* Keep waiting */
1426 
1427  timer += WAIT_INTERVAL;
1428  }
1429 
1430  disconnect_database(conn, false);
1431 
1432  if (status == POSTMASTER_STILL_STARTING)
1433  pg_fatal("server did not end recovery");
1434 
1435  pg_log_info("target server reached the consistent state");
1436  pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1437 }
#define pg_log_info_hint(...)
Definition: logging.h:130
#define WAIT_INTERVAL
#define USEC_PER_SEC
#define NUM_CONN_ATTEMPTS
void pg_usleep(long microsec)
Definition: signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, NUM_CONN_ATTEMPTS, pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), POSTMASTER_READY, POSTMASTER_STILL_STARTING, PQclear(), PQexec(), PQntuples(), recovery_ended, CreateSubscriberOptions::recovery_timeout, res, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USEC_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfo

◆ dry_run

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 117 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 119 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 118 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 123 of file pg_createsubscriber.c.

Referenced by main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 124 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

◆ prng_state

pg_prng_state prng_state
static

Definition at line 121 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 108 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 129 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

char* subscriber_dir = NULL
static

◆ success

bool success = false
static

Definition at line 113 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().