20 #include "catalog/pg_authid_d.h"
31 #define DEFAULT_SUB_PORT "50432"
70 const char *pub_base_conninfo,
71 const char *sub_base_conninfo);
83 const char *consistent_lsn);
87 const char *slotname);
91 const char *slot_name);
94 bool restricted_access);
105 #define USEC_PER_SEC 1000000
106 #define WAIT_INTERVAL 1
165 "You must recreate the physical replica before continuing.");
192 pg_log_warning(
"publication \"%s\" in database \"%s\" on primary might be left behind",
198 pg_log_warning(
"replication slot \"%s\" in database \"%s\" on primary might be left behind",
213 printf(
_(
"%s creates a new logical replica from a standby server.\n\n"),
218 printf(
_(
" -d, --database=DBNAME database to create a subscription\n"));
219 printf(
_(
" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
220 printf(
_(
" -n, --dry-run dry run, just show what would be done\n"));
222 printf(
_(
" -P, --publisher-server=CONNSTR publisher connection string\n"));
223 printf(
_(
" -s, --socket-directory=DIR socket directory to use (default current directory)\n"));
224 printf(
_(
" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
225 printf(
_(
" -U, --subscriber-username=NAME subscriber username\n"));
226 printf(
_(
" -v, --verbose output verbose messages\n"));
227 printf(
_(
" --config-file=FILENAME use specified main server configuration\n"
228 " file when running target cluster\n"));
229 printf(
_(
" --publication=NAME publication name\n"));
230 printf(
_(
" --replication-slot=NAME replication slot name\n"));
231 printf(
_(
" --subscription=NAME subscription name\n"));
232 printf(
_(
" -V, --version output version information, then exit\n"));
233 printf(
_(
" -?, --help show this help, then exit\n"));
234 printf(
_(
"\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
235 printf(
_(
"%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
262 if (conn_opts == NULL)
271 for (conn_opt = conn_opts; conn_opt->
keyword != NULL; conn_opt++)
273 if (strcmp(conn_opt->
keyword,
"dbname") == 0 && conn_opt->
val != NULL)
280 if (conn_opt->
val != NULL && conn_opt->
val[0] !=
'\0')
346 pg_fatal(
"program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
347 progname,
"pg_createsubscriber", full_path);
349 pg_fatal(
"program \"%s\" was found by \"%s\" but was not the same version as %s",
350 progname, full_path,
"pg_createsubscriber");
369 pg_log_info(
"checking if directory \"%s\" is a cluster data directory",
381 if (
stat(versionfile, &statbuf) != 0 && errno == ENOENT)
383 pg_fatal(
"directory \"%s\" is not a database cluster directory",
421 const char *pub_base_conninfo,
422 const char *sub_base_conninfo)
466 pg_log_debug(
"publisher(%d): publication: %s ; replication slot: %s ; connection string: %s",
i,
470 pg_log_debug(
"subscriber(%d): subscription: %s ; connection string: %s",
i,
475 pubcell = pubcell->
next;
477 subcell = subcell->
next;
479 replslotcell = replslotcell->
next;
553 pg_log_info(
"getting system identifier from publisher");
557 res =
PQexec(
conn,
"SELECT system_identifier FROM pg_catalog.pg_control_system()");
566 pg_log_error(
"could not get system identifier: got %d rows, expected %d row",
573 pg_log_info(
"system identifier is %llu on publisher",
574 (
unsigned long long) sysid);
594 pg_log_info(
"getting system identifier from subscriber");
598 pg_fatal(
"control file appears to be corrupt");
602 pg_log_info(
"system identifier is %llu on subscriber",
603 (
unsigned long long) sysid);
624 pg_log_info(
"modifying system identifier of subscriber");
628 pg_fatal(
"control file appears to be corrupt");
643 pg_log_info(
"system identifier is %llu on subscriber",
646 pg_log_info(
"running pg_resetwal on the subscriber");
655 int rc = system(cmd_str);
658 pg_log_info(
"subscriber successfully changed the system identifier");
660 pg_fatal(
"subscriber failed to change system identifier: exit code: %d", rc);
680 "SELECT oid FROM pg_catalog.pg_database "
681 "WHERE datname = pg_catalog.current_database()");
691 pg_log_error(
"could not obtain database OID: got %d rows, expected %d row",
709 objname =
psprintf(
"pg_createsubscriber_%u_%x", oid, rand);
730 char *genname = NULL;
763 pg_log_info(
"create replication slot \"%s\" on publisher",
844 "(SELECT setting AS wallevel FROM pg_catalog.pg_settings "
845 "WHERE name = 'wal_level'), "
847 "(SELECT setting AS tmrs FROM pg_catalog.pg_settings "
848 "WHERE name = 'max_replication_slots'), "
850 "(SELECT count(*) AS cmrs "
851 "FROM pg_catalog.pg_replication_slots), "
853 "(SELECT setting AS tmws FROM pg_catalog.pg_settings "
854 "WHERE name = 'max_wal_senders'), "
856 "(SELECT count(*) AS cmws FROM pg_catalog.pg_stat_activity "
857 "WHERE backend_type = 'walsender') "
858 "SELECT wallevel, tmrs, cmrs, tmws, cmws "
859 "FROM wl, total_mrs, cur_mrs, total_mws, cur_mws");
877 pg_log_debug(
"publisher: max_replication_slots: %d", max_repslots);
878 pg_log_debug(
"publisher: current replication slots: %d", cur_repslots);
879 pg_log_debug(
"publisher: max_wal_senders: %d", max_walsenders);
880 pg_log_debug(
"publisher: current wal senders: %d", cur_walsenders);
894 "SELECT 1 FROM pg_catalog.pg_replication_slots "
895 "WHERE active AND slot_name = %s",
905 pg_log_error(
"could not obtain replication slot information: %s",
912 pg_log_error(
"could not obtain replication slot information: got %d rows, expected %d row",
927 pg_log_error(
"publisher requires wal_level >= \"logical\"");
931 if (max_repslots - cur_repslots <
num_dbs)
933 pg_log_error(
"publisher requires %d replication slots, but only %d remain",
934 num_dbs, max_repslots - cur_repslots);
940 if (max_walsenders - cur_walsenders <
num_dbs)
942 pg_log_error(
"publisher requires %d wal sender processes, but only %d remain",
943 num_dbs, max_walsenders - cur_walsenders);
999 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1000 "'max_logical_replication_workers', "
1001 "'max_replication_slots', "
1002 "'max_worker_processes', "
1003 "'primary_slot_name') "
1008 pg_log_error(
"could not obtain subscriber settings: %s",
1019 pg_log_debug(
"subscriber: max_logical_replication_workers: %d",
1021 pg_log_debug(
"subscriber: max_replication_slots: %d", max_repslots);
1022 pg_log_debug(
"subscriber: max_worker_processes: %d", max_wprocs);
1032 pg_log_error(
"subscriber requires %d replication slots, but only %d remain",
1041 pg_log_error(
"subscriber requires %d logical replication workers, but only %d remain",
1043 pg_log_error_hint(
"Consider increasing max_logical_replication_workers to at least %d.",
1050 pg_log_error(
"subscriber requires %d worker processes, but only %d remain",
1124 "recovery_target_timeline = 'latest'\n");
1126 "recovery_target_inclusive = true\n");
1128 "recovery_target_action = promote\n");
1137 "recovery_target_lsn = '%X/%X'\n",
1175 pg_log_warning(
"could not drop replication slot \"%s\" on primary",
1193 char *slot_name_esc;
1198 pg_log_info(
"creating the replication slot \"%s\" on database \"%s\"",
1204 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1216 pg_log_error(
"could not create replication slot \"%s\" on database \"%s\": %s",
1238 const char *slot_name)
1241 char *slot_name_esc;
1246 pg_log_info(
"dropping the replication slot \"%s\" on database \"%s\"",
1262 pg_log_error(
"could not drop replication slot \"%s\" on database \"%s\": %s",
1288 pg_log_error(
"pg_ctl was terminated by exception 0x%X",
1290 pg_log_error_detail(
"See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1298 pg_log_error(
"pg_ctl exited with unrecognized status %d", rc);
1314 if (restricted_access)
1325 appendPQExpBufferStr(pg_ctl_cmd,
" -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1336 rc = system(pg_ctl_cmd->
data);
1352 rc = system(pg_ctl_cmd);
1372 #define NUM_CONN_ATTEMPTS 10
1374 pg_log_info(
"waiting for the target server to reach the consistent state");
1401 "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
1407 pg_log_error(
"standby server disconnected from the primary");
1433 pg_fatal(
"server did not end recovery");
1435 pg_log_info(
"target server reached the consistent state");
1436 pg_log_info_hint(
"If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1457 "SELECT 1 FROM pg_catalog.pg_publication "
1458 "WHERE pubname = %s",
1463 pg_log_error(
"could not obtain publication information: %s",
1485 pg_log_info(
"creating publication \"%s\" on database \"%s\"",
1498 pg_log_error(
"could not create publication \"%s\" on database \"%s\": %s",
1527 pg_log_info(
"dropping publication \"%s\" on database \"%s\"",
1541 pg_log_error(
"could not drop publication \"%s\" on database \"%s\": %s",
1577 char *pubconninfo_esc;
1578 char *replslotname_esc;
1587 pg_log_info(
"creating subscription \"%s\" on database \"%s\"",
1591 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1592 "WITH (create_slot = false, enabled = false, "
1593 "slot_name = %s, copy_data = false)",
1594 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1608 pg_log_error(
"could not create subscription \"%s\" on database \"%s\": %s",
1645 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1646 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1647 "WHERE s.subname = %s AND d.datname = %s",
1660 pg_log_error(
"could not obtain subscription OID: got %d rows, expected %d row",
1682 originname =
psprintf(
"pg_%u", suboid);
1684 pg_log_info(
"setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"",
1689 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1690 originname, lsnstr);
1699 pg_log_error(
"could not set replication progress for the subscription \"%s\": %s",
1730 pg_log_info(
"enabling subscription \"%s\" on database \"%s\"",
1742 pg_log_error(
"could not enable subscription \"%s\": %s",
1757 static struct option long_options[] =
1782 char *pub_base_conninfo;
1783 char *sub_base_conninfo;
1784 char *dbname_conninfo = NULL;
1788 struct stat statbuf;
1790 char *consistent_lsn;
1801 if (strcmp(argv[1],
"--help") == 0 || strcmp(argv[1],
"-?") == 0)
1806 else if (strcmp(argv[1],
"-V") == 0
1807 || strcmp(argv[1],
"--version") == 0)
1809 puts(
"pg_createsubscriber (PostgreSQL) " PG_VERSION);
1843 while ((
c =
getopt_long(argc, argv,
"d:D:np:P:s:t:U:v",
1844 long_options, &option_index)) != -1)
1935 pg_log_error(
"too many command-line arguments (first is \"%s\")",
1944 pg_log_error(
"no subscriber data directory specified");
1955 pg_fatal(
"could not determine current directory");
1972 pg_log_error(
"no publisher connection string specified");
1976 pg_log_info(
"validating connection string on publisher");
1979 if (pub_base_conninfo == NULL)
1982 pg_log_info(
"validating connection string on subscriber");
1994 if (dbname_conninfo)
1999 pg_log_info(
"database \"%s\" was extracted from the publisher connection string",
2015 pg_log_error_hint(
"Number of publication names (%d) must match number of database names (%d).",
2022 pg_log_error_hint(
"Number of subscription names (%d) must match number of database names (%d).",
2028 pg_log_error(
"wrong number of replication slot names");
2029 pg_log_error_hint(
"Number of replication slot names (%d) must match number of database names (%d).",
2057 if (pub_sysid != sub_sysid)
2058 pg_fatal(
"subscriber data directory is not a copy of the source database cluster");
2069 if (
stat(pidfile, &statbuf) == 0)
2081 pg_log_info(
"starting the standby with command-line options");
#define Assert(condition)
#define PG_TEXTDOMAIN(domain)
#define strtou64(str, endptr, base)
int find_my_exec(const char *argv0, char *retpath)
void set_pglocale_pgservice(const char *argv0, const char *app)
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
static void PGresult * res
int errmsg(const char *fmt,...)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
void PQconninfoFree(PQconninfoOption *connOptions)
char * PQerrorMessage(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
PGconn * PQconnectdb(const char *conninfo)
void PQfreemem(void *ptr)
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
ExecStatusType PQresultStatus(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
int PQntuples(const PGresult *res)
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
PGresult * PQexec(PGconn *conn, const char *query)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
char * pg_strdup(const char *in)
void * pg_malloc(size_t size)
#define pg_malloc_array(type, count)
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
#define required_argument
void pg_logging_increase_verbosity(void)
void pg_logging_init(const char *argv0)
void pg_logging_set_level(enum pg_log_level new_level)
#define pg_log_error(...)
#define pg_log_error_hint(...)
#define pg_log_warning_hint(...)
#define pg_log_info_hint(...)
#define pg_log_error_detail(...)
#define pg_log_debug(...)
static PQExpBuffer recoveryconfcontents
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static struct LogicalRepInfo * dbinfo
static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static void stop_standby_server(const char *datadir)
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
int main(int argc, char **argv)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
@ POSTMASTER_STILL_STARTING
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static bool recovery_ended
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
#define NUM_CONN_ATTEMPTS
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
PGDLLIMPORT char * optarg
uint32 pg_prng_uint32(pg_prng_state *state)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
#define pg_log_warning(...)
void canonicalize_path(char *path)
const char * get_progname(const char *argv0)
const char * pg_strsignal(int signum)
size_t strlcpy(char *dst, const char *src, size_t siz)
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
char * psprintf(const char *fmt,...)
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
void get_restricted_token(void)
void pg_usleep(long microsec)
bool simple_string_list_member(SimpleStringList *list, const char *val)
void simple_string_list_append(SimpleStringList *list, const char *val)
struct SimpleStringList SimpleStringList
const char * sub_username
SimpleStringList database_names
SimpleStringList sub_names
SimpleStringList replslot_names
SimpleStringList pub_names
char val[FLEXIBLE_ARRAY_MEMBER]
struct SimpleStringListCell * next
SimpleStringListCell * head
int gettimeofday(struct timeval *tp, void *tzp)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr