35#define DEFAULT_SUB_PORT "50432"
36#define OBJECTTYPE_PUBLICATIONS 0x0001
49#define PG_AUTOCONF_FILENAME "postgresql.auto.conf"
50#define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
51#define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
53#define SERVER_LOG_FILE_NAME "pg_createsubscriber_server.log"
54#define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal.log"
102static void usage(
void);
126 const char *slotname);
131 const char *slot_name);
167#define WAIT_INTERVAL 1
286 "A manual removal of the recovery parameters may be required.");
301 "failed after the end of recovery");
303 "The target server cannot be used as a physical replica anymore. "
304 "You must recreate the physical replica before continuing.");
335 "publication \"%s\" created in database \"%s\" on primary was left behind",
339 "Drop this publication before trying again.");
344 "replication slot \"%s\" created in database \"%s\" on primary was left behind",
348 "Drop this replication slot soon to avoid retention of WAL files.");
361 printf(
_(
"%s creates a new logical replica from a standby server.\n\n"),
366 printf(
_(
" -a, --all create subscriptions for all databases except template\n"
367 " databases and databases that don't allow connections\n"));
368 printf(
_(
" -d, --database=DBNAME database in which to create a subscription\n"));
369 printf(
_(
" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
370 printf(
_(
" -l, --logdir=LOGDIR location for the log directory\n"));
371 printf(
_(
" -n, --dry-run dry run, just show what would be done\n"));
373 printf(
_(
" -P, --publisher-server=CONNSTR publisher connection string\n"));
374 printf(
_(
" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
375 printf(
_(
" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
376 printf(
_(
" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
377 printf(
_(
" -U, --subscriber-username=NAME user name for subscriber connection\n"));
378 printf(
_(
" -v, --verbose output verbose messages\n"));
379 printf(
_(
" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
380 " databases on the subscriber; accepts: \"%s\"\n"),
"publications");
381 printf(
_(
" --config-file=FILENAME use specified main server configuration\n"
382 " file when running target cluster\n"));
383 printf(
_(
" --publication=NAME publication name\n"));
384 printf(
_(
" --replication-slot=NAME replication slot name\n"));
385 printf(
_(
" --subscription=NAME subscription name\n"));
386 printf(
_(
" -V, --version output version information, then exit\n"));
387 printf(
_(
" -?, --help show this help, then exit\n"));
431 "could not parse connection string: %s",
errmsg);
508 report_createsub_fatal(
"program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
534 "checking if directory \"%s\" is a cluster data directory",
554 "data directory is of wrong version");
556 "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
641 "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s",
i,
646 "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s",
i,
678 "connection to database failed: %s",
692 "could not clear \"search_path\": %s",
733 "getting system identifier from publisher");
737 res =
PQexec(
conn,
"SELECT system_identifier FROM pg_catalog.pg_control_system()");
741 "could not get system identifier: %s",
748 "could not get system identifier: got %d rows, expected %d row",
756 "system identifier is %" PRIu64 " on publisher",
sysid);
777 "getting system identifier from subscriber");
786 "system identifier is %" PRIu64 " on subscriber",
sysid);
809 "modifying system identifier of subscriber");
821 cf->system_identifier = ((
uint64) tv.tv_sec) << 32;
822 cf->system_identifier |= ((
uint64) tv.tv_usec) << 12;
823 cf->system_identifier |=
getpid() & 0xFFF;
827 "dry-run: would set system identifier to %" PRIu64 " on subscriber",
828 cf->system_identifier);
833 "system identifier is %" PRIu64 " on subscriber",
834 cf->system_identifier);
839 "dry-run: would run pg_resetwal on the subscriber");
842 "running pg_resetwal on the subscriber");
860 "pg_resetwal command is: %s",
cmd_str);
868 "successfully reset WAL on the subscriber");
891 "SELECT oid FROM pg_catalog.pg_database "
892 "WHERE datname = pg_catalog.current_database()");
896 "could not obtain database OID: %s",
904 "could not obtain database OID: got %d rows, expected %d row",
922 objname =
psprintf(
"pg_createsubscriber_%u_%x", oid,
rand);
939 "SELECT 1 FROM pg_catalog.pg_publication "
940 "WHERE pubname = %s",
946 "could not find publication \"%s\" in database \"%s\": %s",
1001 "use existing publication \"%s\" in database \"%s\"",
1002 dbinfo[
i].pubname, dbinfo[
i].
dbname);
1037 res =
PQexec(
conn,
"SELECT pg_log_standby_snapshot()");
1041 "could not write an additional WAL record: %s",
1063 res =
PQexec(
conn,
"SELECT pg_catalog.pg_is_in_recovery()");
1068 "could not obtain recovery progress: %s",
1157 (
unsigned int) (
tval.tv_usec / 1000));
1184 bool failed =
false;
1195 "checking settings on publisher");
1206 "primary server cannot be in recovery");
1222 "SELECT pg_catalog.current_setting('wal_level'),"
1223 " pg_catalog.current_setting('max_replication_slots'),"
1224 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1225 " pg_catalog.current_setting('max_wal_senders'),"
1226 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1227 " pg_catalog.current_setting('max_prepared_transactions'),"
1228 " pg_catalog.current_setting('max_slot_wal_keep_size')");
1233 "could not obtain publisher settings: %s",
1253 "publisher: current replication slots: %d",
cur_repslots);
1259 "publisher: max_prepared_transactions: %d",
1262 "publisher: max_slot_wal_keep_size: %s",
1270 "publisher requires \"wal_level\" >= \"replica\"");
1277 "publisher requires %d replication slots, but only %d remain",
1280 "Increase the configuration parameter \"%s\" to at least %d.",
1288 "publisher requires %d WAL sender processes, but only %d remain",
1291 "Increase the configuration parameter \"%s\" to at least %d.",
1299 "two_phase option will not be enabled for replication slots");
1301 "Subscriptions will be created with the two_phase option disabled. "
1302 "Prepared transactions will be replicated at COMMIT PREPARED.");
1304 "You can use the command-line option --enable-two-phase to enable two_phase.");
1315 "required WAL could be removed from the publisher");
1317 "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1318 "max_slot_wal_keep_size");
1343 bool failed =
false;
1350 "checking settings on subscriber");
1358 "target server must be a standby");
1373 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1374 "'max_logical_replication_workers', "
1375 "'max_active_replication_origins', "
1376 "'max_worker_processes', "
1377 "'primary_slot_name') "
1383 "could not obtain subscriber settings: %s",
1395 "subscriber: max_logical_replication_workers: %d",
1400 "subscriber: max_worker_processes: %d",
max_wprocs);
1412 "subscriber requires %d active replication origins, but only %d remain",
1415 "Increase the configuration parameter \"%s\" to at least %d.",
1416 "max_active_replication_origins",
num_dbs);
1423 "subscriber requires %d logical replication workers, but only %d remain",
1426 "Increase the configuration parameter \"%s\" to at least %d.",
1427 "max_logical_replication_workers",
num_dbs);
1434 "subscriber requires %d worker processes, but only %d remain",
1437 "Increase the configuration parameter \"%s\" to at least %d.",
1438 "max_worker_processes",
num_dbs + 1);
1472 "dry-run: would drop subscription \"%s\" in database \"%s\"",
1477 "dropping subscription \"%s\" in database \"%s\"",
1485 "could not drop subscription \"%s\": %s",
1512 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1513 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1514 "WHERE d.datname = %s",
1521 "could not obtain pre-existing subscriptions: %s",
1602 "recovery_target_timeline = 'latest'\n");
1613 "recovery_target_inclusive = false\n");
1615 "recovery_target_action = promote\n");
1624 "recovery_target_lsn = '%X/%08X'\n",
1689 "could not drop replication slot \"%s\" on primary",
1692 "Drop this replication slot soon to avoid retention of WAL files.");
1714 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1725 "could not obtain failover replication slot information: %s",
1728 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1737 "could not drop failover replication slot");
1739 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1762 "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1763 slot_name, dbinfo->
dbname);
1766 "creating the replication slot \"%s\" in database \"%s\" on publisher",
1767 slot_name, dbinfo->
dbname);
1772 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1779 "command is: %s",
str->data);
1787 "could not create replication slot \"%s\" in database \"%s\": %s",
1788 slot_name, dbinfo->
dbname,
1809 const char *slot_name)
1819 "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1820 slot_name, dbinfo->
dbname);
1823 "dropping the replication slot \"%s\" in database \"%s\"",
1824 slot_name, dbinfo->
dbname);
1833 "command is: %s",
str->data);
1841 "could not drop replication slot \"%s\" in database \"%s\": %s",
1863 "pg_ctl failed with exit code %d",
1870 "pg_ctl was terminated by exception 0x%X",
1873 "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1876 "pg_ctl was terminated by signal %d: %s",
1883 "pg_ctl exited with unrecognized status %d", rc);
1942 "server was started");
1959 "server was stopped");
1979 "waiting for the target server to reach the consistent state");
1998 "recovery timed out");
2013 "target server reached the consistent state");
2015 "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
2036 "SELECT 1 FROM pg_catalog.pg_publication "
2037 "WHERE pubname = %s",
2043 "could not obtain publication information: %s",
2058 "publication \"%s\" already exists", dbinfo->
pubname);
2060 "Consider renaming this publication before continuing.");
2069 "dry-run: would create publication \"%s\" in database \"%s\"",
2073 "creating publication \"%s\" in database \"%s\"",
2080 "command is: %s",
str->data);
2088 "could not create publication \"%s\" in database \"%s\": %s",
2108 bool *made_publication)
2120 "dry-run: would drop publication \"%s\" in database \"%s\"",
2124 "dropping publication \"%s\" in database \"%s\"",
2132 "command is: %s",
str->data);
2140 "could not drop publication \"%s\" in database \"%s\": %s",
2142 *made_publication =
false;
2177 "dropping all existing publications in database \"%s\"",
2181 res =
PQexec(
conn,
"SELECT pubname FROM pg_catalog.pg_publication;");
2185 "could not obtain publication information: %s",
2210 "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
2214 "preserve existing publication \"%s\" in database \"%s\"",
2250 "dry-run: would create subscription \"%s\" in database \"%s\"",
2254 "creating subscription \"%s\" in database \"%s\"",
2258 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2259 "WITH (create_slot = false, enabled = false, "
2260 "slot_name = %s, copy_data = false, two_phase = %s)",
2270 "command is: %s",
str->data);
2278 "could not create subscription \"%s\" in database \"%s\": %s",
2315 "SELECT s.oid FROM pg_catalog.pg_subscription s "
2316 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2317 "WHERE s.subname = %s AND d.datname = %s",
2324 "could not obtain subscription OID: %s",
2332 "could not obtain subscription OID: got %d rows, expected %d row",
2358 "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2362 "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2367 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2371 "command is: %s",
str->data);
2379 "could not set replication progress for subscription \"%s\": %s",
2412 "dry-run: would enable subscription \"%s\" in database \"%s\"",
2416 "enabling subscription \"%s\" in database \"%s\"",
2422 "command is: %s",
str->data);
2430 "could not enable subscription \"%s\": %s",
2473 res =
PQexec(
conn,
"SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2477 "could not obtain a list of databases: %s",
2548 if (
strcmp(argv[1],
"--help") == 0 ||
strcmp(argv[1],
"-?") == 0)
2553 else if (
strcmp(argv[1],
"-V") == 0
2554 ||
strcmp(argv[1],
"--version") == 0)
2585 "cannot be executed by \"root\"");
2587 "You must run %s as the PostgreSQL superuser.",
2595 while ((
c =
getopt_long(argc, argv,
"ad:D:l:np:P:s:t:TU:v",
2684 "Try \"%s --help\" for more information.",
2707 "options %s and %s cannot be used together",
2710 "Try \"%s --help\" for more information.",
2720 "too many command-line arguments (first is \"%s\")",
2723 "Try \"%s --help\" for more information.",
progname);
2731 "no subscriber data directory specified");
2733 "Try \"%s --help\" for more information.",
progname);
2761 "no publisher connection string specified");
2763 "Try \"%s --help\" for more information.",
progname);
2794 "Executing in dry-run mode.\n"
2795 "The target directory will not be modified.");
2798 "validating publisher connection string");
2805 "validating subscriber connection string");
2823 "no database was specified");
2835 "database name \"%s\" was extracted from the publisher connection string",
2841 "no database name specified");
2843 "Try \"%s --help\" for more information.",
2853 "wrong number of publication names specified");
2855 "The number of specified publication names (%d) must match the number of specified database names (%d).",
2862 "wrong number of subscription names specified");
2864 "The number of specified subscription names (%d) must match the number of specified database names (%d).",
2871 "wrong number of replication slot names specified");
2873 "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2886 "invalid object type \"%s\" specified for %s",
2887 cell->val,
"--clean");
2889 "The valid value is: \"%s\"",
"publications");
2934 "standby server is running");
2936 "Stop the standby server and try again.");
2946 "starting the standby server with command-line options");
2963 "stopping the subscriber");
2978 "starting the subscriber");
3000 "stopping the subscriber");
Datum now(PG_FUNCTION_ARGS)
#define Assert(condition)
#define PG_TEXTDOMAIN(domain)
#define pg_attribute_printf(f, a)
int find_my_exec(const char *argv0, char *retpath)
void set_pglocale_pgservice(const char *argv0, const char *app)
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
#define fprintf(file, fmt, msg)
int durable_rename(const char *oldfile, const char *newfile, int elevel)
PGconn * PQconnectdb(const char *conninfo)
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
void PQfreemem(void *ptr)
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
PGresult * PQexec(PGconn *conn, const char *query)
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
void * pg_malloc(size_t size)
char * pg_strdup(const char *in)
#define pg_malloc_array(type, count)
uint32 get_pg_version(const char *datadir, char **version_str)
bool GetDataDirectoryCreatePerm(const char *dataDir)
#define PG_MODE_MASK_OWNER
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
#define required_argument
#define PQresultErrorMessage
void pg_logging_increase_verbosity(void)
void pg_logging_init(const char *argv0)
void pg_logging_set_level(enum pg_log_level new_level)
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
enum pg_log_level __pg_log_level
static PQExpBuffer recoveryconfcontents
static PgChecksumMode mode
static void static void static pg_noreturn void report_createsub_fatal(const char *pg_restrict fmt,...) pg_attribute_printf(1
#define SERVER_LOG_FILE_NAME
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
#define INTERNAL_LOG_FILE_NAME
static void stop_standby_server(const char *datadir)
static void static void report_createsub_log_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list args) pg_attribute_printf(3
static char * pg_ctl_path
static bool server_is_in_recovery(PGconn *conn)
static char * get_exec_path(const char *argv0, const char *progname)
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static char * subscriber_dir
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * primary_slot_name
static void cleanup_objects_atexit(void)
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static pg_prng_state prng_state
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
static FILE * logfile_open(const char *filename, const char *mode)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
#define INCLUDED_CONF_FILE
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static void report_createsub_log(enum pg_log_level, enum pg_log_part, const char *pg_restrict fmt,...) pg_attribute_printf(3
static struct LogicalRepInfos dbinfos
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static uint64 get_standby_sysid(const char *datadir)
static char logdir[MAXPGPATH]
static bool recovery_ended
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void disconnect_database(PGconn *conn, bool exit_on_error)
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void static void static pg_noreturn void static void internal_log_file_write(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list args) pg_attribute_printf(3
static FILE * internal_log_file_fp
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
#define OBJECTTYPE_PUBLICATIONS
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static char * pg_resetwal_path
static char * generate_object_name(PGconn *conn)
static const char * progname
static void make_output_dirs(const char *log_basedir)
#define INCLUDED_CONF_FILE_DISABLED
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
static bool recovery_params_set
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
PGDLLIMPORT char * optarg
uint32 pg_prng_uint32(pg_prng_state *state)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pg_strcasecmp(const char *s1, const char *s2)
const char * pg_strsignal(int signum)
void canonicalize_path(char *path)
const char * get_progname(const char *argv0)
size_t strlcpy(char *dst, const char *src, size_t siz)
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
static int fd(const char *x, int i)
char * psprintf(const char *fmt,...)
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
void get_restricted_token(void)
void pg_usleep(long microsec)
bool simple_string_list_member(SimpleStringList *list, const char *val)
void simple_string_list_append(SimpleStringList *list, const char *val)
void appendShellString(PQExpBuffer buf, const char *str)
void appendConnStrVal(PQExpBuffer buf, const char *str)
const char * sub_username
SimpleStringList database_names
SimpleStringList objecttypes_to_clean
SimpleStringList sub_names
SimpleStringList replslot_names
SimpleStringList pub_names
struct LogicalRepInfo * dbinfo
uint32 objecttypes_to_clean
SimpleStringListCell * head
#define GET_PG_MAJORVERSION_NUM(v)
char * wait_result_to_str(int exitstatus)
int gettimeofday(struct timeval *tp, void *tzp)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr