PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/wait.h>
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "datatype/timestamp.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "fe_utils/version.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 
struct  LogicalRepInfos
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define OBJECTTYPE_PUBLICATIONS   0x0001
 
#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"
 
#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"
 
#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"
 
#define SERVER_LOG_FILE_NAME   "pg_createsubscriber_server.log"
 
#define INTERNAL_LOG_FILE_NAME   "pg_createsubscriber_internal.log"
 
#define WAIT_INTERVAL   1 /* 1 second */
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage (void)
 
static charget_base_conninfo (const char *conninfo, char **dbname)
 
static charget_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static charget_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static charconcat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static chargenerate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static charsetup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
 
static charcreate_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static bool find_publication (PGconn *conn, const char *pubname, const char *dbname)
 
static void drop_publication (PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
 
static void check_and_drop_publications (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void drop_existing_subscription (PGconn *conn, const char *subname, const char *dbname)
 
static void get_publisher_databases (struct CreateSubscriberOptions *opt, bool dbnamespecified)
 
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
 
static void make_output_dirs (const char *log_basedir)
 
int main (int argc, char **argv)
 

Variables

static const charprogname
 
static charprimary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfos dbinfos
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static charpg_ctl_path = NULL
 
static charpg_resetwal_path = NULL
 
static charlogdir = NULL
 
static charsubscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 
static bool recovery_params_set = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 35 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE

#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"

Definition at line 50 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE_DISABLED

#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"

Definition at line 51 of file pg_createsubscriber.c.

◆ INTERNAL_LOG_FILE_NAME

#define INTERNAL_LOG_FILE_NAME   "pg_createsubscriber_internal.log"

Definition at line 54 of file pg_createsubscriber.c.

◆ OBJECTTYPE_PUBLICATIONS

#define OBJECTTYPE_PUBLICATIONS   0x0001

Definition at line 36 of file pg_createsubscriber.c.

◆ PG_AUTOCONF_FILENAME

#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"

Definition at line 49 of file pg_createsubscriber.c.

◆ SERVER_LOG_FILE_NAME

#define SERVER_LOG_FILE_NAME   "pg_createsubscriber_server.log"

Definition at line 53 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 155 of file pg_createsubscriber.c.

Function Documentation

◆ appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer  buf,
const char keyword,
const char val 
)
static

Definition at line 322 of file pg_createsubscriber.c.

323{
324 if (buf->len > 0)
326 appendPQExpBufferStr(buf, keyword);
329}
long val
Definition informix.c:689
static char buf[DEFAULT_XLOG_SEG_SIZE]
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void appendConnStrVal(PQExpBuffer buf, const char *str)

References appendConnStrVal(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and val.

Referenced by concat_conninfo_dbname(), get_base_conninfo(), and get_sub_conninfo().

◆ check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1305 of file pg_createsubscriber.c.

1307{
1309 char *dbname;
1310 PGresult *res;
1311
1312 Assert(conn != NULL);
1313
1314 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1315
1316 appendPQExpBuffer(query,
1317 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1318 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1319 "WHERE d.datname = %s",
1320 dbname);
1321 res = PQexec(conn, query->data);
1322
1323 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1324 {
1325 pg_log_error("could not obtain pre-existing subscriptions: %s",
1328 }
1329
1330 for (int i = 0; i < PQntuples(res); i++)
1332 dbinfo->dbname);
1333
1334 PQclear(res);
1335 destroyPQExpBuffer(query);
1337}
#define Assert(condition)
Definition c.h:943
void PQfreemem(void *ptr)
Definition fe-exec.c:4068
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4418
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
int i
Definition isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
#define PQclear
#define PQresultStatus
#define PQntuples
@ PGRES_TUPLES_OK
Definition libpq-fe.h:134
#define pg_log_error(...)
Definition logging.h:108
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static void disconnect_database(PGconn *conn, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
static int fb(int x)
char * dbname
Definition streamutil.c:49
PGconn * conn
Definition streamutil.c:52

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscription(), fb(), i, pg_log_error, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by setup_subscriber().

◆ check_and_drop_publications()

static void check_and_drop_publications ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1930 of file pg_createsubscriber.c.

1931{
1932 PGresult *res;
1934
1935 Assert(conn != NULL);
1936
1937 if (drop_all_pubs)
1938 {
1939 pg_log_info("dropping all existing publications in database \"%s\"",
1940 dbinfo->dbname);
1941
1942 /* Fetch all publication names */
1943 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1944 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1945 {
1946 pg_log_error("could not obtain publication information: %s",
1948 PQclear(res);
1950 }
1951
1952 /* Drop each publication */
1953 for (int i = 0; i < PQntuples(res); i++)
1954 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1955 &dbinfo->made_publication);
1956
1957 PQclear(res);
1958 }
1959 else
1960 {
1961 /* Drop publication only if it was created by this tool */
1962 if (dbinfo->made_publication)
1963 {
1964 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1965 &dbinfo->made_publication);
1966 }
1967 else
1968 {
1969 if (dry_run)
1970 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1971 dbinfo->pubname, dbinfo->dbname);
1972 else
1973 pg_log_info("preserving existing publication \"%s\" in database \"%s\"",
1974 dbinfo->pubname, dbinfo->dbname);
1975 }
1976 }
1977}
#define pg_log_info(...)
Definition logging.h:126
static struct LogicalRepInfos dbinfos
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static bool dry_run
#define OBJECTTYPE_PUBLICATIONS

References Assert, conn, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), dry_run, fb(), i, LogicalRepInfo::made_publication, OBJECTTYPE_PUBLICATIONS, LogicalRepInfos::objecttypes_to_clean, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, and LogicalRepInfo::pubname.

Referenced by setup_subscriber().

◆ check_data_directory()

static void check_data_directory ( const char datadir)
static

Definition at line 450 of file pg_createsubscriber.c.

451{
452 struct stat statbuf;
453 uint32 major_version;
454 char *version_str;
455
456 pg_log_info("checking if directory \"%s\" is a cluster data directory",
457 datadir);
458
459 if (stat(datadir, &statbuf) != 0)
460 {
461 if (errno == ENOENT)
462 pg_fatal("data directory \"%s\" does not exist", datadir);
463 else
464 pg_fatal("could not access directory \"%s\": %m", datadir);
465 }
466
467 /*
468 * Retrieve the contents of this cluster's PG_VERSION. We require
469 * compatibility with the same major version as the one this tool is
470 * compiled with.
471 */
473 if (major_version != PG_MAJORVERSION_NUM)
474 {
475 pg_log_error("data directory is of wrong version");
476 pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
477 "PG_VERSION", version_str, PG_MAJORVERSION);
478 exit(1);
479 }
480}
uint32_t uint32
Definition c.h:624
uint32 get_pg_version(const char *datadir, char **version_str)
Definition version.c:44
#define pg_log_error_detail(...)
Definition logging.h:111
#define pg_fatal(...)
char * datadir
#define GET_PG_MAJORVERSION_NUM(v)
Definition version.h:19
#define stat
Definition win32_port.h:74

References datadir, fb(), GET_PG_MAJORVERSION_NUM, get_pg_version(), pg_fatal, pg_log_error, pg_log_error_detail, pg_log_info, and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1016 of file pg_createsubscriber.c.

1017{
1018 PGconn *conn;
1019 PGresult *res;
1020 bool failed = false;
1021
1022 char *wal_level;
1023 int max_repslots;
1024 int cur_repslots;
1025 int max_walsenders;
1026 int cur_walsenders;
1029
1030 pg_log_info("checking settings on publisher");
1031
1032 conn = connect_database(dbinfo[0].pubconninfo, true);
1033
1034 /*
1035 * If the primary server is in recovery (i.e. cascading replication),
1036 * objects (publication) cannot be created because it is read only.
1037 */
1039 {
1040 pg_log_error("primary server cannot be in recovery");
1042 }
1043
1044 /*------------------------------------------------------------------------
1045 * Logical replication requires a few parameters to be set on publisher.
1046 * Since these parameters are not a requirement for physical replication,
1047 * we should check it to make sure it won't fail.
1048 *
1049 * - wal_level >= replica
1050 * - max_replication_slots >= current + number of dbs to be converted
1051 * - max_wal_senders >= current + number of dbs to be converted
1052 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1053 * -----------------------------------------------------------------------
1054 */
1055 res = PQexec(conn,
1056 "SELECT pg_catalog.current_setting('wal_level'),"
1057 " pg_catalog.current_setting('max_replication_slots'),"
1058 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1059 " pg_catalog.current_setting('max_wal_senders'),"
1060 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1061 " pg_catalog.current_setting('max_prepared_transactions'),"
1062 " pg_catalog.current_setting('max_slot_wal_keep_size')");
1063
1064 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1065 {
1066 pg_log_error("could not obtain publisher settings: %s",
1069 }
1070
1071 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1072 max_repslots = atoi(PQgetvalue(res, 0, 1));
1073 cur_repslots = atoi(PQgetvalue(res, 0, 2));
1074 max_walsenders = atoi(PQgetvalue(res, 0, 3));
1075 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1078
1079 PQclear(res);
1080
1081 pg_log_debug("publisher: wal_level: %s", wal_level);
1082 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
1083 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
1084 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
1085 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
1086 pg_log_debug("publisher: max_prepared_transactions: %d",
1088 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
1090
1091 disconnect_database(conn, false);
1092
1093 if (strcmp(wal_level, "minimal") == 0)
1094 {
1095 pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
1096 failed = true;
1097 }
1098
1100 {
1101 pg_log_error("publisher requires %d replication slots, but only %d remain",
1103 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1104 "max_replication_slots", cur_repslots + num_dbs);
1105 failed = true;
1106 }
1107
1109 {
1110 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1112 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1113 "max_wal_senders", cur_walsenders + num_dbs);
1114 failed = true;
1115 }
1116
1118 {
1119 pg_log_warning("two_phase option will not be enabled for replication slots");
1120 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1121 "Prepared transactions will be replicated at COMMIT PREPARED.");
1122 pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1123 }
1124
1125 /*
1126 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1127 * is set to a non-default value, it may cause replication failures due to
1128 * required WAL files being prematurely removed.
1129 */
1130 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1131 {
1132 pg_log_warning("required WAL could be removed from the publisher");
1133 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1134 "max_slot_wal_keep_size");
1135 }
1136
1138
1139 if (failed)
1140 exit(1);
1141}
char * pg_strdup(const char *in)
Definition fe_memutils.c:91
void pg_free(void *ptr)
#define pg_log_error_hint(...)
Definition logging.h:114
#define pg_log_warning_hint(...)
Definition logging.h:123
#define pg_log_warning_detail(...)
Definition logging.h:120
#define pg_log_debug(...)
Definition logging.h:135
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
#define pg_log_warning(...)
Definition pgfnames.c:24
int wal_level
Definition xlog.c:138

References conn, connect_database(), dbinfos, disconnect_database(), dry_run, fb(), num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_log_warning, pg_log_warning_detail, pg_log_warning_hint, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, server_is_in_recovery(), LogicalRepInfos::two_phase, and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1155 of file pg_createsubscriber.c.

1156{
1157 PGconn *conn;
1158 PGresult *res;
1159 bool failed = false;
1160
1161 int max_lrworkers;
1162 int max_replorigins;
1163 int max_wprocs;
1164
1165 pg_log_info("checking settings on subscriber");
1166
1167 conn = connect_database(dbinfo[0].subconninfo, true);
1168
1169 /* The target server must be a standby */
1171 {
1172 pg_log_error("target server must be a standby");
1174 }
1175
1176 /*------------------------------------------------------------------------
1177 * Logical replication requires a few parameters to be set on subscriber.
1178 * Since these parameters are not a requirement for physical replication,
1179 * we should check it to make sure it won't fail.
1180 *
1181 * - max_active_replication_origins >= number of dbs to be converted
1182 * - max_logical_replication_workers >= number of dbs to be converted
1183 * - max_worker_processes >= 1 + number of dbs to be converted
1184 *------------------------------------------------------------------------
1185 */
1186 res = PQexec(conn,
1187 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1188 "'max_logical_replication_workers', "
1189 "'max_active_replication_origins', "
1190 "'max_worker_processes', "
1191 "'primary_slot_name') "
1192 "ORDER BY name");
1193
1194 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1195 {
1196 pg_log_error("could not obtain subscriber settings: %s",
1199 }
1200
1201 max_replorigins = atoi(PQgetvalue(res, 0, 0));
1202 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1203 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1204 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1206
1207 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1209 pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
1210 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1212 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1213
1214 PQclear(res);
1215
1216 disconnect_database(conn, false);
1217
1219 {
1220 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1222 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1223 "max_active_replication_origins", num_dbs);
1224 failed = true;
1225 }
1226
1227 if (max_lrworkers < num_dbs)
1228 {
1229 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1231 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1232 "max_logical_replication_workers", num_dbs);
1233 failed = true;
1234 }
1235
1236 if (max_wprocs < num_dbs + 1)
1237 {
1238 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1239 num_dbs + 1, max_wprocs);
1240 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1241 "max_worker_processes", num_dbs + 1);
1242 failed = true;
1243 }
1244
1245 if (failed)
1246 exit(1);
1247}
static char * primary_slot_name

References conn, connect_database(), disconnect_database(), fb(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, primary_slot_name, and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 201 of file pg_createsubscriber.c.

202{
203 /* Rename the included configuration file, if necessary. */
205 {
208
213
215 {
216 /* durable_rename() has already logged something. */
217 pg_log_warning_hint("A manual removal of the recovery parameters might be required.");
218 }
219 }
220
221 if (success)
222 return;
223
224 /*
225 * If the server is promoted, there is no way to use the current setup
226 * again. Warn the user that a new replication setup should be done before
227 * trying again.
228 */
229 if (recovery_ended)
230 {
231 pg_log_warning("failed after the end of recovery");
232 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
233 "You must recreate the physical replica before continuing.");
234 }
235
236 for (int i = 0; i < num_dbs; i++)
237 {
238 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
239
240 if (dbinfo->made_publication || dbinfo->made_replslot)
241 {
242 PGconn *conn;
243
244 conn = connect_database(dbinfo->pubconninfo, false);
245 if (conn != NULL)
246 {
247 if (dbinfo->made_publication)
248 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
249 &dbinfo->made_publication);
250 if (dbinfo->made_replslot)
251 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
253 }
254 else
255 {
256 /*
257 * If a connection could not be established, inform the user
258 * that some objects were left on primary and should be
259 * removed before trying again.
260 */
261 if (dbinfo->made_publication)
262 {
263 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
264 dbinfo->pubname,
265 dbinfo->dbname);
266 pg_log_warning_hint("Drop this publication before trying again.");
267 }
268 if (dbinfo->made_replslot)
269 {
270 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
271 dbinfo->replslotname,
272 dbinfo->dbname);
273 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
274 }
275 }
276 }
277 }
278
279 if (standby_running)
281}
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
#define MAXPGPATH
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
#define INCLUDED_CONF_FILE
static bool success
static bool recovery_ended
#define INCLUDED_CONF_FILE_DISABLED
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static bool recovery_params_set
#define snprintf
Definition port.h:261
struct LogicalRepInfo * dbinfo

References conn, connect_database(), LogicalRepInfos::dbinfo, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), drop_replication_slot(), durable_rename(), fb(), i, INCLUDED_CONF_FILE, INCLUDED_CONF_FILE_DISABLED, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, MAXPGPATH, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, recovery_params_set, LogicalRepInfo::replslotname, snprintf, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char conninfo,
const char dbname 
)
static

Definition at line 490 of file pg_createsubscriber.c.

491{
493 char *ret;
494
495 Assert(conninfo != NULL);
496
497 appendPQExpBufferStr(buf, conninfo);
498 appendConnStrItem(buf, "dbname", dbname);
499
500 ret = pg_strdup(buf->data);
502
503 return ret;
504}
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert, buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), fb(), and pg_strdup().

Referenced by get_publisher_databases(), and store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char conninfo,
bool  exit_on_error 
)
static

Definition at line 587 of file pg_createsubscriber.c.

588{
589 PGconn *conn;
590 PGresult *res;
591
592 conn = PQconnectdb(conninfo);
594 {
595 pg_log_error("connection to database failed: %s",
597 PQfinish(conn);
598
599 if (exit_on_error)
600 exit(1);
601 return NULL;
602 }
603
604 /* Secure search_path */
607 {
608 pg_log_error("could not clear \"search_path\": %s",
610 PQclear(res);
611 PQfinish(conn);
612
613 if (exit_on_error)
614 exit(1);
615 return NULL;
616 }
617 PQclear(res);
618
619 return conn;
620}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition connect.h:25
PGconn * PQconnectdb(const char *conninfo)
Definition fe-connect.c:830
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
@ CONNECTION_OK
Definition libpq-fe.h:90

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, fb(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage, PQresultStatus, and PQstatus().

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), get_publisher_databases(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1547 of file pg_createsubscriber.c.

1548{
1550 PGresult *res = NULL;
1551 const char *slot_name = dbinfo->replslotname;
1552 char *slot_name_esc;
1553 char *lsn = NULL;
1554
1555 Assert(conn != NULL);
1556
1557 if (dry_run)
1558 pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1559 slot_name, dbinfo->dbname);
1560 else
1561 pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1562 slot_name, dbinfo->dbname);
1563
1564 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1565
1567 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1569 dbinfos.two_phase ? "true" : "false");
1570
1572
1573 pg_log_debug("command is: %s", str->data);
1574
1575 if (!dry_run)
1576 {
1577 res = PQexec(conn, str->data);
1578 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1579 {
1580 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1581 slot_name, dbinfo->dbname,
1583 PQclear(res);
1585 return NULL;
1586 }
1587
1588 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1589 PQclear(res);
1590 }
1591
1592 /* For cleanup purposes */
1593 dbinfo->made_replslot = true;
1594
1596
1597 return lsn;
1598}
const char * str

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, fb(), LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::replslotname, str, and LogicalRepInfos::two_phase.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1796 of file pg_createsubscriber.c.

1797{
1799 PGresult *res;
1800 char *ipubname_esc;
1801 char *spubname_esc;
1802
1803 Assert(conn != NULL);
1804
1806 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1807
1808 /* Check if the publication already exists */
1810 "SELECT 1 FROM pg_catalog.pg_publication "
1811 "WHERE pubname = %s",
1812 spubname_esc);
1813 res = PQexec(conn, str->data);
1814 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1815 {
1816 pg_log_error("could not obtain publication information: %s",
1819 }
1820
1821 if (PQntuples(res) == 1)
1822 {
1823 /*
1824 * Unfortunately, if it reaches this code path, it will always fail
1825 * (unless you decide to change the existing publication name). That's
1826 * bad but it is very unlikely that the user will choose a name with
1827 * pg_createsubscriber_ prefix followed by the exact database oid and
1828 * a random number.
1829 */
1830 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1831 pg_log_error_hint("Consider renaming this publication before continuing.");
1833 }
1834
1835 PQclear(res);
1837
1838 if (dry_run)
1839 pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1840 dbinfo->pubname, dbinfo->dbname);
1841 else
1842 pg_log_info("creating publication \"%s\" in database \"%s\"",
1843 dbinfo->pubname, dbinfo->dbname);
1844
1845 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1846 ipubname_esc);
1847
1848 pg_log_debug("command is: %s", str->data);
1849
1850 if (!dry_run)
1851 {
1852 res = PQexec(conn, str->data);
1853 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1854 {
1855 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1856 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1858 }
1859 PQclear(res);
1860 }
1861
1862 /* For cleanup purposes */
1863 dbinfo->made_publication = true;
1864
1868}
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4424
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
void resetPQExpBuffer(PQExpBuffer str)

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), LogicalRepInfo::made_publication, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubname, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1991 of file pg_createsubscriber.c.

1992{
1994 PGresult *res;
1995 char *pubname_esc;
1996 char *subname_esc;
1997 char *pubconninfo_esc;
1998 char *replslotname_esc;
1999
2000 Assert(conn != NULL);
2001
2006
2007 if (dry_run)
2008 pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
2009 dbinfo->subname, dbinfo->dbname);
2010 else
2011 pg_log_info("creating subscription \"%s\" in database \"%s\"",
2012 dbinfo->subname, dbinfo->dbname);
2013
2015 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2016 "WITH (create_slot = false, enabled = false, "
2017 "slot_name = %s, copy_data = false, two_phase = %s)",
2019 dbinfos.two_phase ? "true" : "false");
2020
2025
2026 pg_log_debug("command is: %s", str->data);
2027
2028 if (!dry_run)
2029 {
2030 res = PQexec(conn, str->data);
2031 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2032 {
2033 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
2034 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2036 }
2037 PQclear(res);
2038 }
2039
2041}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, str, LogicalRepInfo::subname, and LogicalRepInfos::two_phase.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_existing_subscription()

static void drop_existing_subscription ( PGconn conn,
const char subname,
const char dbname 
)
static

Definition at line 1256 of file pg_createsubscriber.c.

1257{
1259 PGresult *res;
1260 char *subname_esc;
1261
1262 Assert(conn != NULL);
1263
1265
1266 /*
1267 * Construct a query string. These commands are allowed to be executed
1268 * within a transaction.
1269 */
1270 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1271 subname_esc);
1272 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1273 subname_esc);
1274 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname_esc);
1275
1277
1278 if (dry_run)
1279 pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1280 subname, dbname);
1281 else
1282 {
1283 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1284 subname, dbname);
1285
1286 res = PQexec(conn, query->data);
1287
1288 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1289 {
1290 pg_log_error("could not drop subscription \"%s\": %s",
1293 }
1294
1295 PQclear(res);
1296 }
1297
1298 destroyPQExpBuffer(query);
1299}
NameData subname

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and subname.

Referenced by check_and_drop_existing_subscriptions().

◆ drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo dbinfo)
static

Definition at line 1505 of file pg_createsubscriber.c.

1506{
1507 PGconn *conn;
1508 PGresult *res;
1509
1510 conn = connect_database(dbinfo[0].subconninfo, false);
1511 if (conn != NULL)
1512 {
1513 /* Get failover replication slot names */
1514 res = PQexec(conn,
1515 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1516
1517 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1518 {
1519 /* Remove failover replication slots from subscriber */
1520 for (int i = 0; i < PQntuples(res); i++)
1521 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1522 }
1523 else
1524 {
1525 pg_log_warning("could not obtain failover replication slot information: %s",
1527 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1528 }
1529
1530 PQclear(res);
1531 disconnect_database(conn, false);
1532 }
1533 else
1534 {
1535 pg_log_warning("could not drop failover replication slot");
1536 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1537 }
1538}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), fb(), i, pg_log_warning, pg_log_warning_hint, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char slotname 
)
static

Definition at line 1475 of file pg_createsubscriber.c.

1476{
1477 PGconn *conn;
1478
1479 /* Replication slot does not exist, do nothing */
1480 if (!primary_slot_name)
1481 return;
1482
1483 conn = connect_database(dbinfo[0].pubconninfo, false);
1484 if (conn != NULL)
1485 {
1486 drop_replication_slot(conn, &dbinfo[0], slotname);
1487 disconnect_database(conn, false);
1488 }
1489 else
1490 {
1491 pg_log_warning("could not drop replication slot \"%s\" on primary",
1492 slotname);
1493 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1494 }
1495}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), fb(), pg_log_warning, pg_log_warning_hint, and primary_slot_name.

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
const char pubname,
const char dbname,
bool made_publication 
)
static

Definition at line 1874 of file pg_createsubscriber.c.

1876{
1878 PGresult *res;
1879 char *pubname_esc;
1880
1881 Assert(conn != NULL);
1882
1883 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1884
1885 if (dry_run)
1886 pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1887 pubname, dbname);
1888 else
1889 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1890 pubname, dbname);
1891
1892 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1893
1895
1896 pg_log_debug("command is: %s", str->data);
1897
1898 if (!dry_run)
1899 {
1900 res = PQexec(conn, str->data);
1901 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1902 {
1903 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1904 pubname, dbname, PQresultErrorMessage(res));
1905 *made_publication = false; /* don't try again. */
1906
1907 /*
1908 * Don't disconnect and exit here. This routine is used by primary
1909 * (cleanup publication / replication slot due to an error) and
1910 * subscriber (remove the replicated publications). In both cases,
1911 * it can continue and provide instructions for the user to remove
1912 * it later if cleanup fails.
1913 */
1914 }
1915 PQclear(res);
1916 }
1917
1919}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), dry_run, fb(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by check_and_drop_publications(), and cleanup_objects_atexit().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char slot_name 
)
static

Definition at line 1601 of file pg_createsubscriber.c.

1603{
1605 char *slot_name_esc;
1606 PGresult *res;
1607
1608 Assert(conn != NULL);
1609
1610 if (dry_run)
1611 pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1612 slot_name, dbinfo->dbname);
1613 else
1614 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1615 slot_name, dbinfo->dbname);
1616
1617 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1618
1619 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1620
1622
1623 pg_log_debug("command is: %s", str->data);
1624
1625 if (!dry_run)
1626 {
1627 res = PQexec(conn, str->data);
1628 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1629 {
1630 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1631 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1632 dbinfo->made_replslot = false; /* don't try again. */
1633 }
1634
1635 PQclear(res);
1636 }
1637
1639}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, fb(), LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 2149 of file pg_createsubscriber.c.

2150{
2152 PGresult *res;
2153 char *subname;
2154
2155 Assert(conn != NULL);
2156
2157 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2158
2159 if (dry_run)
2160 pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2161 dbinfo->subname, dbinfo->dbname);
2162 else
2163 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2164 dbinfo->subname, dbinfo->dbname);
2165
2166 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2167
2168 pg_log_debug("command is: %s", str->data);
2169
2170 if (!dry_run)
2171 {
2172 res = PQexec(conn, str->data);
2173 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2174 {
2175 pg_log_error("could not enable subscription \"%s\": %s",
2176 dbinfo->subname, PQresultErrorMessage(res));
2178 }
2179
2180 PQclear(res);
2181 }
2182
2185}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ find_publication()

static bool find_publication ( PGconn conn,
const char pubname,
const char dbname 
)
static

Definition at line 832 of file pg_createsubscriber.c.

833{
835 PGresult *res;
836 bool found = false;
837 char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
838
840 "SELECT 1 FROM pg_catalog.pg_publication "
841 "WHERE pubname = %s",
843 res = PQexec(conn, str->data);
845 {
846 pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
847 pubname, dbname, PQerrorMessage(conn));
849 }
850
851 if (PQntuples(res) == 1)
852 found = true;
853
854 PQclear(res);
857
858 return found;
859}

References appendPQExpBuffer(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), disconnect_database(), fb(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultStatus, and str.

Referenced by setup_publisher().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 786 of file pg_createsubscriber.c.

787{
788 PGresult *res;
789 Oid oid;
790 uint32 rand;
791 char *objname;
792
793 res = PQexec(conn,
794 "SELECT oid FROM pg_catalog.pg_database "
795 "WHERE datname = pg_catalog.current_database()");
797 {
798 pg_log_error("could not obtain database OID: %s",
801 }
802
803 if (PQntuples(res) != 1)
804 {
805 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
806 PQntuples(res), 1);
808 }
809
810 /* Database OID */
811 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
812
813 PQclear(res);
814
815 /* Random unsigned integer */
817
818 /*
819 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
820 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
821 * '\0').
822 */
823 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
824
825 return objname;
826}
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition pg_prng.c:227
unsigned int Oid
char * psprintf(const char *fmt,...)
Definition psprintf.c:43

References conn, disconnect_database(), fb(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, prng_state, and psprintf().

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char conninfo,
char **  dbname 
)
static

Definition at line 344 of file pg_createsubscriber.c.

345{
349 char *errmsg = NULL;
350 char *ret;
351
352 conn_opts = PQconninfoParse(conninfo, &errmsg);
353 if (conn_opts == NULL)
354 {
355 pg_log_error("could not parse connection string: %s", errmsg);
357 return NULL;
358 }
359
361 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
362 {
363 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
364 {
365 if (strcmp(conn_opt->keyword, "dbname") == 0)
366 {
367 if (dbname)
368 *dbname = pg_strdup(conn_opt->val);
369 continue;
370 }
371 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
372 }
373 }
374
375 ret = pg_strdup(buf->data);
376
379
380 return ret;
381}
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
static char * errmsg

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg, fb(), pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), and PQfreemem().

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char argv0,
const char progname 
)
static

Definition at line 414 of file pg_createsubscriber.c.

415{
416 char *versionstr;
417 char *exec_path;
418 int ret;
419
420 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
423
424 if (ret < 0)
425 {
426 char full_path[MAXPGPATH];
427
428 if (find_my_exec(argv0, full_path) < 0)
430
431 if (ret == -1)
432 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
433 progname, "pg_createsubscriber", full_path);
434 else
435 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
436 progname, full_path, "pg_createsubscriber");
437 }
438
439 pg_log_debug("%s path is: %s", progname, exec_path);
440
441 return exec_path;
442}
int find_my_exec(const char *argv0, char *retpath)
Definition exec.c:161
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition exec.c:311
void * pg_malloc(size_t size)
Definition fe_memutils.c:53
static const char * progname
static char * argv0
Definition pg_ctl.c:94
static char * exec_path
Definition pg_ctl.c:89
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45

References argv0, exec_path, fb(), find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char conninfo)
static

Definition at line 642 of file pg_createsubscriber.c.

643{
644 PGconn *conn;
645 PGresult *res;
647
648 pg_log_info("getting system identifier from publisher");
649
650 conn = connect_database(conninfo, true);
651
652 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
654 {
655 pg_log_error("could not get system identifier: %s",
658 }
659 if (PQntuples(res) != 1)
660 {
661 pg_log_error("could not get system identifier: got %d rows, expected %d row",
662 PQntuples(res), 1);
664 }
665
666 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
667
668 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
669
670 PQclear(res);
672
673 return sysid;
674}
uint64_t uint64
Definition c.h:625

References conn, connect_database(), disconnect_database(), fb(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ get_publisher_databases()

static void get_publisher_databases ( struct CreateSubscriberOptions opt,
bool  dbnamespecified 
)
static

Definition at line 2193 of file pg_createsubscriber.c.

2195{
2196 PGconn *conn;
2197 PGresult *res;
2198
2199 /* If a database name was specified, just connect to it. */
2200 if (dbnamespecified)
2202 else
2203 {
2204 /* Otherwise, try postgres first and then template1. */
2205 char *conninfo;
2206
2207 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2208 conn = connect_database(conninfo, false);
2209 pg_free(conninfo);
2210 if (!conn)
2211 {
2212 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2213 conn = connect_database(conninfo, true);
2214 pg_free(conninfo);
2215 }
2216 }
2217
2218 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2219 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2220 {
2221 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2222 PQclear(res);
2224 }
2225
2226 for (int i = 0; i < PQntuples(res); i++)
2227 {
2228 const char *dbname = PQgetvalue(res, i, 0);
2229
2231
2232 /* Increment num_dbs to reflect multiple --database options */
2233 num_dbs++;
2234 }
2235
2236 PQclear(res);
2237 disconnect_database(conn, false);
2238}
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition simple_list.c:63
SimpleStringList database_names

References concat_conninfo_dbname(), conn, connect_database(), CreateSubscriberOptions::database_names, dbname, disconnect_database(), fb(), i, num_dbs, pg_free(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, CreateSubscriberOptions::pub_conninfo_str, and simple_string_list_append().

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char datadir)
static

Definition at line 682 of file pg_createsubscriber.c.

683{
685 bool crc_ok;
687
688 pg_log_info("getting system identifier from subscriber");
689
691 if (!crc_ok)
692 pg_fatal("control file appears to be corrupt");
693
694 sysid = cf->system_identifier;
695
696 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
697
698 pg_free(cf);
699
700 return sysid;
701}
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)

References datadir, fb(), get_controlfile(), pg_fatal, pg_free(), and pg_log_info.

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 388 of file pg_createsubscriber.c.

389{
391 char *ret;
392
393 appendConnStrItem(buf, "port", opt->sub_port);
394#if !defined(WIN32)
395 appendConnStrItem(buf, "host", opt->socket_dir);
396#endif
397 if (opt->sub_username != NULL)
398 appendConnStrItem(buf, "user", opt->sub_username);
399 appendConnStrItem(buf, "fallback_application_name", progname);
400
401 ret = pg_strdup(buf->data);
402
404
405 return ret;
406}

References appendConnStrItem(), buf, createPQExpBuffer(), destroyPQExpBuffer(), fb(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2241 of file pg_createsubscriber.c.

2242{
2243 static struct option long_options[] =
2244 {
2245 {"all", no_argument, NULL, 'a'},
2246 {"database", required_argument, NULL, 'd'},
2247 {"pgdata", required_argument, NULL, 'D'},
2248 {"logdir", required_argument, NULL, 'l'},
2249 {"dry-run", no_argument, NULL, 'n'},
2250 {"subscriber-port", required_argument, NULL, 'p'},
2251 {"publisher-server", required_argument, NULL, 'P'},
2252 {"socketdir", required_argument, NULL, 's'},
2253 {"recovery-timeout", required_argument, NULL, 't'},
2254 {"enable-two-phase", no_argument, NULL, 'T'},
2255 {"subscriber-username", required_argument, NULL, 'U'},
2256 {"verbose", no_argument, NULL, 'v'},
2257 {"version", no_argument, NULL, 'V'},
2258 {"help", no_argument, NULL, '?'},
2259 {"config-file", required_argument, NULL, 1},
2260 {"publication", required_argument, NULL, 2},
2261 {"replication-slot", required_argument, NULL, 3},
2262 {"subscription", required_argument, NULL, 4},
2263 {"clean", required_argument, NULL, 5},
2264 {NULL, 0, NULL, 0}
2265 };
2266
2267 struct CreateSubscriberOptions opt = {0};
2268
2269 int c;
2270 int option_index;
2271
2272 char *pub_base_conninfo;
2273 char *sub_base_conninfo;
2274 char *dbname_conninfo = NULL;
2275
2278 struct stat statbuf;
2279
2280 char *consistent_lsn;
2281
2282 char pidfile[MAXPGPATH];
2283
2284 pg_logging_init(argv[0]);
2286 progname = get_progname(argv[0]);
2287 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2288
2289 if (argc > 1)
2290 {
2291 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2292 {
2293 usage();
2294 exit(0);
2295 }
2296 else if (strcmp(argv[1], "-V") == 0
2297 || strcmp(argv[1], "--version") == 0)
2298 {
2299 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2300 exit(0);
2301 }
2302 }
2303
2304 /* Default settings */
2306 opt.config_file = NULL;
2307 opt.log_dir = NULL;
2308 opt.pub_conninfo_str = NULL;
2309 opt.socket_dir = NULL;
2311 opt.sub_username = NULL;
2312 opt.two_phase = false;
2314 {
2315 0
2316 };
2317 opt.recovery_timeout = 0;
2318 opt.all_dbs = false;
2319
2320 /*
2321 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2322 * it either.
2323 */
2324#ifndef WIN32
2325 if (geteuid() == 0)
2326 {
2327 pg_log_error("cannot be executed by \"root\"");
2328 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2329 progname);
2330 exit(1);
2331 }
2332#endif
2333
2335
2336 while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
2337 long_options, &option_index)) != -1)
2338 {
2339 switch (c)
2340 {
2341 case 'a':
2342 opt.all_dbs = true;
2343 break;
2344 case 'd':
2346 {
2348 num_dbs++;
2349 }
2350 else
2351 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2352 break;
2353 case 'D':
2356 break;
2357 case 'l':
2358 opt.log_dir = pg_strdup(optarg);
2360 break;
2361 case 'n':
2362 dry_run = true;
2363 break;
2364 case 'p':
2365 opt.sub_port = pg_strdup(optarg);
2366 break;
2367 case 'P':
2369 break;
2370 case 's':
2373 break;
2374 case 't':
2376 break;
2377 case 'T':
2378 opt.two_phase = true;
2379 break;
2380 case 'U':
2382 break;
2383 case 'v':
2385 break;
2386 case 1:
2388 break;
2389 case 2:
2391 {
2393 num_pubs++;
2394 }
2395 else
2396 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2397 break;
2398 case 3:
2400 {
2402 num_replslots++;
2403 }
2404 else
2405 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2406 break;
2407 case 4:
2409 {
2411 num_subs++;
2412 }
2413 else
2414 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2415 break;
2416 case 5:
2419 else
2420 pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2421 break;
2422 default:
2423 /* getopt_long already emitted a complaint */
2424 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2425 exit(1);
2426 }
2427 }
2428
2429 /* Validate that --all is not used with incompatible options */
2430 if (opt.all_dbs)
2431 {
2432 char *bad_switch = NULL;
2433
2434 if (num_dbs > 0)
2435 bad_switch = "--database";
2436 else if (num_pubs > 0)
2437 bad_switch = "--publication";
2438 else if (num_replslots > 0)
2439 bad_switch = "--replication-slot";
2440 else if (num_subs > 0)
2441 bad_switch = "--subscription";
2442
2443 if (bad_switch)
2444 {
2445 pg_log_error("options %s and %s cannot be used together",
2446 bad_switch, "-a/--all");
2447 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2448 exit(1);
2449 }
2450 }
2451
2452 /* Any non-option arguments? */
2453 if (optind < argc)
2454 {
2455 pg_log_error("too many command-line arguments (first is \"%s\")",
2456 argv[optind]);
2457 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2458 exit(1);
2459 }
2460
2461 /* Required arguments */
2462 if (subscriber_dir == NULL)
2463 {
2464 pg_log_error("no subscriber data directory specified");
2465 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2466 exit(1);
2467 }
2468
2469 /* If socket directory is not provided, use the current directory */
2470 if (opt.socket_dir == NULL)
2471 {
2472 char cwd[MAXPGPATH];
2473
2474 if (!getcwd(cwd, MAXPGPATH))
2475 pg_fatal("could not determine current directory");
2476 opt.socket_dir = pg_strdup(cwd);
2478 }
2479
2480 /*
2481 * Parse connection string. Build a base connection string that might be
2482 * reused by multiple databases.
2483 */
2484 if (opt.pub_conninfo_str == NULL)
2485 {
2486 /*
2487 * TODO use primary_conninfo (if available) from subscriber and
2488 * extract publisher connection string. Assume that there are
2489 * identical entries for physical and logical replication. If there is
2490 * not, we would fail anyway.
2491 */
2492 pg_log_error("no publisher connection string specified");
2493 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2494 exit(1);
2495 }
2496
2497 if (opt.log_dir != NULL)
2498 {
2499 char *internal_log_file;
2501
2503
2504 /*
2505 * Set mask based on PGDATA permissions, needed for the creation of
2506 * the output directories with correct permissions, similar with
2507 * pg_ctl and pg_upgrade.
2508 *
2509 * Don't error here if the data directory cannot be stat'd. Upcoming
2510 * checks for the data directory would raise the fatal error later.
2511 */
2514
2517
2520 pg_fatal("could not open log file \"%s\": %m", internal_log_file);
2521
2523
2525 }
2526
2527 if (dry_run)
2528 pg_log_info("Executing in dry-run mode.\n"
2529 "The target directory will not be modified.");
2530
2531 pg_log_info("validating publisher connection string");
2534 if (pub_base_conninfo == NULL)
2535 exit(1);
2536
2537 pg_log_info("validating subscriber connection string");
2539
2540 /*
2541 * Fetch all databases from the source (publisher) and treat them as if
2542 * the user specified has multiple --database options, one for each source
2543 * database.
2544 */
2545 if (opt.all_dbs)
2546 {
2548
2550 }
2551
2552 if (opt.database_names.head == NULL)
2553 {
2554 pg_log_info("no database was specified");
2555
2556 /*
2557 * Try to obtain the dbname from the publisher conninfo. If dbname
2558 * parameter is not available, error out.
2559 */
2560 if (dbname_conninfo)
2561 {
2563 num_dbs++;
2564
2565 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2567 }
2568 else
2569 {
2570 pg_log_error("no database name specified");
2571 pg_log_error_hint("Try \"%s --help\" for more information.",
2572 progname);
2573 exit(1);
2574 }
2575 }
2576
2577 /* Number of object names must match number of databases */
2578 if (num_pubs > 0 && num_pubs != num_dbs)
2579 {
2580 pg_log_error("wrong number of publication names specified");
2581 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2582 num_pubs, num_dbs);
2583 exit(1);
2584 }
2585 if (num_subs > 0 && num_subs != num_dbs)
2586 {
2587 pg_log_error("wrong number of subscription names specified");
2588 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2589 num_subs, num_dbs);
2590 exit(1);
2591 }
2592 if (num_replslots > 0 && num_replslots != num_dbs)
2593 {
2594 pg_log_error("wrong number of replication slot names specified");
2595 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2597 exit(1);
2598 }
2599
2600 /* Verify the object types specified for removal from the subscriber */
2601 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2602 {
2603 if (pg_strcasecmp(cell->val, "publications") == 0)
2605 else
2606 {
2607 pg_log_error("invalid object type \"%s\" specified for option %s",
2608 cell->val, "--clean");
2609 pg_log_error_hint("The valid value is: \"%s\"", "publications");
2610 exit(1);
2611 }
2612 }
2613
2614 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2615 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2616 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2617
2618 /* Rudimentary check for a data directory */
2620
2622
2623 /*
2624 * Store database information for publisher and subscriber. It should be
2625 * called before atexit() because its return is used in the
2626 * cleanup_objects_atexit().
2627 */
2629
2630 /* Register a function to clean up objects in case of failure */
2632
2633 /*
2634 * Check if the subscriber data directory has the same system identifier
2635 * than the publisher data directory.
2636 */
2639 if (pub_sysid != sub_sysid)
2640 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2641
2642 /* Subscriber PID file */
2643 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2644
2645 /*
2646 * The standby server must not be running. If the server is started under
2647 * service manager and pg_createsubscriber stops it, the service manager
2648 * might react to this action and start the server again. Therefore,
2649 * refuse to proceed if the server is running to avoid possible failures.
2650 */
2651 if (stat(pidfile, &statbuf) == 0)
2652 {
2653 pg_log_error("standby server is running");
2654 pg_log_error_hint("Stop the standby server and try again.");
2655 exit(1);
2656 }
2657
2658 /*
2659 * Start a short-lived standby server with temporary parameters (provided
2660 * by command-line options). The goal is to avoid connections during the
2661 * transformation steps.
2662 */
2663 pg_log_info("starting the standby server with command-line options");
2664 start_standby_server(&opt, true, false);
2665
2666 /* Check if the standby server is ready for logical replication */
2668
2669 /* Check if the primary server is ready for logical replication */
2671
2672 /*
2673 * Stop the target server. The recovery process requires that the server
2674 * reaches a consistent state before targeting the recovery stop point.
2675 * Make sure a consistent state is reached (stop the target server
2676 * guarantees it) *before* creating the replication slots in
2677 * setup_publisher().
2678 */
2679 pg_log_info("stopping the subscriber");
2681
2682 /* Create the required objects for each database on publisher */
2684
2685 /* Write the required recovery parameters */
2687
2688 /*
2689 * Start subscriber so the recovery parameters will take effect. Wait
2690 * until accepting connections. We don't want to start logical replication
2691 * during setup.
2692 */
2693 pg_log_info("starting the subscriber");
2694 start_standby_server(&opt, true, true);
2695
2696 /* Waiting the subscriber to be promoted */
2698
2699 /*
2700 * Create the subscription for each database on subscriber. It does not
2701 * enable it immediately because it needs to adjust the replication start
2702 * point to the LSN reported by setup_publisher(). It also cleans up
2703 * publications created by this tool and replication to the standby.
2704 */
2706
2707 /* Remove primary_slot_name if it exists on primary */
2709
2710 /* Remove failover replication slots if they exist on subscriber */
2712
2713 /* Stop the subscriber */
2714 pg_log_info("stopping the subscriber");
2716
2717 /* Change system identifier from subscriber */
2719
2720 success = true;
2721
2722 pg_log_info("Done!");
2723
2724 return 0;
2725}
#define PG_TEXTDOMAIN(domain)
Definition c.h:1303
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition exec.c:430
int pg_mode_mask
Definition file_perm.c:25
bool GetDataDirectoryCreatePerm(const char *dataDir)
#define PG_MODE_MASK_OWNER
Definition file_perm.h:24
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition getopt_long.c:60
#define no_argument
Definition getopt_long.h:25
#define required_argument
Definition getopt_long.h:26
void pg_logging_increase_verbosity(void)
Definition logging.c:187
void pg_logging_set_logfile(FILE *logfile)
Definition logging.c:210
void pg_logging_init(const char *argv0)
Definition logging.c:85
void pg_logging_set_level(enum pg_log_level new_level)
Definition logging.c:178
@ PG_LOG_WARNING
Definition logging.h:38
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
#define INTERNAL_LOG_FILE_NAME
static char * pg_ctl_path
static char * logdir
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static char * pg_resetwal_path
static void make_output_dirs(const char *log_basedir)
static void usage(void)
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition getopt.c:47
PGDLLIMPORT char * optarg
Definition getopt.c:49
int pg_strcasecmp(const char *s1, const char *s2)
void canonicalize_path(char *path)
Definition path.c:337
const char * get_progname(const char *argv0)
Definition path.c:669
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition simple_list.c:87
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
SimpleStringListCell * head
Definition simple_list.h:42

References CreateSubscriberOptions::all_dbs, canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, LogicalRepInfos::dbinfo, dbinfos, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, fb(), get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_publisher_databases(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), GetDataDirectoryCreatePerm(), getopt_long(), SimpleStringList::head, INTERNAL_LOG_FILE_NAME, CreateSubscriberOptions::log_dir, logdir, make_output_dirs(), MAXPGPATH, modify_subscriber_sysid(), no_argument, num_dbs, num_pubs, num_replslots, num_subs, OBJECTTYPE_PUBLICATIONS, CreateSubscriberOptions::objecttypes_to_clean, LogicalRepInfos::objecttypes_to_clean, optarg, optind, pg_ctl_path, pg_fatal, pg_free(), pg_log_error, pg_log_error_detail, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_logging_set_logfile(), pg_mode_mask, PG_MODE_MASK_OWNER, pg_resetwal_path, pg_strcasecmp(), pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, psprintf(), CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, LogicalRepInfo::subconninfo, subscriber_dir, success, CreateSubscriberOptions::two_phase, LogicalRepInfos::two_phase, usage(), and wait_for_end_recovery().

◆ make_output_dirs()

static void make_output_dirs ( const char log_basedir)
static

Definition at line 979 of file pg_createsubscriber.c.

980{
981 char timestamp[128];
982 struct timeval tval;
983 time_t now;
984 struct tm tmbuf;
985
986 /* Generate timestamp */
988 now = tval.tv_sec;
989
990 strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
991 localtime_r(&now, &tmbuf));
992
993 /* Append milliseconds */
995 sizeof(timestamp) - strlen(timestamp), ".%03u",
996 (unsigned int) (tval.tv_usec / 1000));
997
998 /* Build timestamp directory path */
1000
1001 /* Create base directory (ignore if exists) */
1003 pg_fatal("could not create directory \"%s\": %m", log_basedir);
1004
1005 /* Create a timestamp-named subdirectory under the base directory */
1007 pg_fatal("could not create directory \"%s\": %m", logdir);
1008}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
int pg_dir_create_mode
Definition file_perm.c:18
static struct pg_tm tm
Definition localtime.c:104
int64 timestamp
#define mkdir(a, b)
Definition win32_port.h:80
int gettimeofday(struct timeval *tp, void *tzp)

References fb(), gettimeofday(), logdir, mkdir, now(), pg_dir_create_mode, pg_fatal, psprintf(), snprintf, and tm.

Referenced by main().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 709 of file pg_createsubscriber.c.

710{
712 bool crc_ok;
713 struct timeval tv;
714
715 char *out_file;
716 char *cmd_str;
717
718 pg_log_info("modifying system identifier of subscriber");
719
721 if (!crc_ok)
722 pg_fatal("control file appears to be corrupt");
723
724 /*
725 * Select a new system identifier.
726 *
727 * XXX this code was extracted from BootStrapXLOG().
728 */
729 gettimeofday(&tv, NULL);
730 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
731 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
732 cf->system_identifier |= getpid() & 0xFFF;
733
734 if (dry_run)
735 pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
736 cf->system_identifier);
737 else
738 {
740 pg_log_info("system identifier is %" PRIu64 " on subscriber",
741 cf->system_identifier);
742 }
743
744 if (dry_run)
745 pg_log_info("dry-run: would run pg_resetwal on the subscriber");
746 else
747 pg_log_info("running pg_resetwal on the subscriber");
748
749 /*
750 * Redirecting the output to the logfile if specified. Since the output
751 * would be very short, around one line, we do not provide a separate file
752 * for it; it's done as a part of the server log.
753 */
754 if (opt->log_dir)
756 else
758
759 cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
761 if (opt->log_dir)
763
764 pg_log_debug("pg_resetwal command is: %s", cmd_str);
765
766 if (!dry_run)
767 {
768 int rc = system(cmd_str);
769
770 if (rc == 0)
771 pg_log_info("successfully reset WAL on the subscriber");
772 else
773 pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
774 }
775
776 pg_free(cf);
778}
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define SERVER_LOG_FILE_NAME
#define DEVNULL
Definition port.h:162
char * wait_result_to_str(int exitstatus)
Definition wait_error.c:33

References DEVNULL, dry_run, fb(), get_controlfile(), gettimeofday(), CreateSubscriberOptions::log_dir, logdir, pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), SERVER_LOG_FILE_NAME, subscriber_dir, update_controlfile(), and wait_result_to_str().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char pg_ctl_cmd,
int  rc 
)
static

Definition at line 1645 of file pg_createsubscriber.c.

1646{
1647 if (rc != 0)
1648 {
1649 if (WIFEXITED(rc))
1650 {
1651 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1652 }
1653 else if (WIFSIGNALED(rc))
1654 {
1655#if defined(WIN32)
1656 pg_log_error("pg_ctl was terminated by exception 0x%X",
1657 WTERMSIG(rc));
1658 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1659#else
1660 pg_log_error("pg_ctl was terminated by signal %d: %s",
1661 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1662#endif
1663 }
1664 else
1665 {
1666 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1667 }
1668
1669 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1670 exit(1);
1671 }
1672}
const char * pg_strsignal(int signum)
Definition pgstrsignal.c:39
#define WIFEXITED(w)
Definition win32_port.h:150
#define WIFSIGNALED(w)
Definition win32_port.h:151
#define WTERMSIG(w)
Definition win32_port.h:153
#define WEXITSTATUS(w)
Definition win32_port.h:152

References fb(), pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 956 of file pg_createsubscriber.c.

957{
958 PGresult *res;
959 int ret;
960
961 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
962
964 {
965 pg_log_error("could not obtain recovery progress: %s",
968 }
969
970
971 ret = strcmp("t", PQgetvalue(res, 0, 0));
972
973 PQclear(res);
974
975 return ret == 0;
976}

References conn, disconnect_database(), fb(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, and PQresultStatus.

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char lsn 
)
static

Definition at line 2054 of file pg_createsubscriber.c.

2055{
2057 PGresult *res;
2058 Oid suboid;
2059 char *subname;
2060 char *dbname;
2061 char *originname;
2062 char *lsnstr;
2063
2064 Assert(conn != NULL);
2065
2066 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2067 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2068
2070 "SELECT s.oid FROM pg_catalog.pg_subscription s "
2071 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2072 "WHERE s.subname = %s AND d.datname = %s",
2073 subname, dbname);
2074
2075 res = PQexec(conn, str->data);
2076 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2077 {
2078 pg_log_error("could not obtain subscription OID: %s",
2081 }
2082
2083 if (PQntuples(res) != 1 && !dry_run)
2084 {
2085 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
2086 PQntuples(res), 1);
2088 }
2089
2090 if (dry_run)
2091 {
2094 }
2095 else
2096 {
2097 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2098 lsnstr = psprintf("%s", lsn);
2099 }
2100
2101 PQclear(res);
2102
2103 /*
2104 * The origin name is defined as pg_%u. %u is the subscription OID. See
2105 * ApplyWorkerMain().
2106 */
2107 originname = psprintf("pg_%u", suboid);
2108
2109 if (dry_run)
2110 pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2111 originname, lsnstr, dbinfo->dbname);
2112 else
2113 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2114 originname, lsnstr, dbinfo->dbname);
2115
2118 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2120
2121 pg_log_debug("command is: %s", str->data);
2122
2123 if (!dry_run)
2124 {
2125 res = PQexec(conn, str->data);
2126 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2127 {
2128 pg_log_error("could not set replication progress for subscription \"%s\": %s",
2129 dbinfo->subname, PQresultErrorMessage(res));
2131 }
2132 PQclear(res);
2133 }
2134
2138 pg_free(lsnstr);
2140}
#define InvalidOid
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, psprintf(), resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 868 of file pg_createsubscriber.c.

869{
870 char *lsn = NULL;
871
872 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
873
874 for (int i = 0; i < num_dbs; i++)
875 {
876 PGconn *conn;
877 char *genname = NULL;
878
879 conn = connect_database(dbinfo[i].pubconninfo, true);
880
881 /*
882 * If an object name was not specified as command-line options, assign
883 * a generated object name. The replication slot has a different rule.
884 * The subscription name is assigned to the replication slot name if
885 * no replication slot is specified. It follows the same rule as
886 * CREATE SUBSCRIPTION.
887 */
888 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
890 if (num_pubs == 0)
891 dbinfo[i].pubname = pg_strdup(genname);
892 if (num_subs == 0)
893 dbinfo[i].subname = pg_strdup(genname);
894 if (num_replslots == 0)
895 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
896
897 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
898 {
899 /* Reuse existing publication on publisher. */
900 pg_log_info("using existing publication \"%s\" in database \"%s\"",
901 dbinfo[i].pubname, dbinfo[i].dbname);
902 /* Don't remove pre-existing publication if an error occurs. */
903 dbinfo[i].made_publication = false;
904 }
905 else
906 {
907 /*
908 * Create publication on publisher. This step should be executed
909 * *before* promoting the subscriber to avoid any transactions
910 * between consistent LSN and the new publication rows (such
911 * transactions wouldn't see the new publication rows resulting in
912 * an error).
913 */
914 create_publication(conn, &dbinfo[i]);
915 }
916
917 /* Create replication slot on publisher */
918 if (lsn)
919 pg_free(lsn);
920 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
921 if (lsn == NULL && !dry_run)
922 exit(1);
923
924 /*
925 * Since we are using the LSN returned by the last replication slot as
926 * recovery_target_lsn, this LSN is ahead of the current WAL position
927 * and the recovery waits until the publisher writes a WAL record to
928 * reach the target and ends the recovery. On idle systems, this wait
929 * time is unpredictable and could lead to failure in promoting the
930 * subscriber. To avoid that, insert a harmless WAL record.
931 */
932 if (i == num_dbs - 1 && !dry_run)
933 {
934 PGresult *res;
935
936 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
938 {
939 pg_log_error("could not write an additional WAL record: %s",
942 }
943 PQclear(res);
944 }
945
947 }
948
949 return lsn;
950}
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), dbname, disconnect_database(), dry_run, fb(), find_publication(), generate_object_name(), i, LogicalRepInfo::made_publication, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_error, pg_log_info, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char datadir,
const char lsn 
)
static

Definition at line 1381 of file pg_createsubscriber.c.

1382{
1383 PGconn *conn;
1385
1386 /*
1387 * Despite of the recovery parameters will be written to the subscriber,
1388 * use a publisher connection. The primary_conninfo is generated using the
1389 * connection settings.
1390 */
1391 conn = connect_database(dbinfo[0].pubconninfo, true);
1392
1393 /*
1394 * Write recovery parameters.
1395 *
1396 * The subscriber is not running yet. In dry run mode, the recovery
1397 * parameters *won't* be written. An invalid LSN is used for printing
1398 * purposes. Additional recovery parameters are added here. It avoids
1399 * unexpected behavior such as end of recovery as soon as a consistent
1400 * state is reached (recovery_target) and failure due to multiple recovery
1401 * targets (name, time, xid, LSN).
1402 */
1404 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1406 "recovery_target_timeline = 'latest'\n");
1407
1408 /*
1409 * Set recovery_target_inclusive = false to avoid reapplying the
1410 * transaction committed at 'lsn' after subscription is enabled. This is
1411 * because the provided 'lsn' is also used as the replication start point
1412 * for the subscription. So, the server can send the transaction committed
1413 * at that 'lsn' after replication is started which can lead to applying
1414 * the same transaction twice if we keep recovery_target_inclusive = true.
1415 */
1417 "recovery_target_inclusive = false\n");
1419 "recovery_target_action = promote\n");
1420 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1421 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1422 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1423
1424 if (dry_run)
1425 {
1426 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1428 "recovery_target_lsn = '%X/%08X'\n",
1430 }
1431 else
1432 {
1433 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1434 lsn);
1435 }
1436
1437 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1438
1439 if (!dry_run)
1440 {
1442 FILE *fd;
1443
1444 /* Write the recovery parameters to INCLUDED_CONF_FILE */
1447 fd = fopen(conf_filename, "w");
1448 if (fd == NULL)
1449 pg_fatal("could not open file \"%s\": %m", conf_filename);
1450
1452 pg_fatal("could not write to file \"%s\": %m", conf_filename);
1453
1454 fclose(fd);
1455 recovery_params_set = true;
1456
1457 /* Include conditionally the recovery parameters. */
1460 "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1462 }
1463
1464 disconnect_database(conn, false);
1465}
static PQExpBuffer recoveryconfcontents
static int fd(const char *x, int i)
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)

References appendPQExpBuffer(), appendPQExpBufferStr(), conn, connect_database(), PQExpBufferData::data, datadir, disconnect_database(), dry_run, fb(), fd(), GenerateRecoveryConfig(), INCLUDED_CONF_FILE, InvalidXLogRecPtr, PQExpBufferData::len, LSN_FORMAT_ARGS, MAXPGPATH, pg_fatal, pg_log_debug, recovery_params_set, recoveryconfcontents, resetPQExpBuffer(), snprintf, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char consistent_lsn 
)
static

Definition at line 1345 of file pg_createsubscriber.c.

1346{
1347 for (int i = 0; i < num_dbs; i++)
1348 {
1349 PGconn *conn;
1350
1351 /* Connect to subscriber. */
1352 conn = connect_database(dbinfo[i].subconninfo, true);
1353
1354 /*
1355 * We don't need the pre-existing subscriptions on the newly formed
1356 * subscriber. They can connect to other publisher nodes and either
1357 * get some unwarranted data or can lead to ERRORs in connecting to
1358 * such nodes.
1359 */
1361
1362 /* Check and drop the required publications in the given database. */
1364
1365 create_subscription(conn, &dbinfo[i]);
1366
1367 /* Set the replication progress to the correct LSN */
1369
1370 /* Enable subscription */
1371 enable_subscription(conn, &dbinfo[i]);
1372
1373 disconnect_database(conn, false);
1374 }
1375}
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), check_and_drop_publications(), conn, connect_database(), create_subscription(), disconnect_database(), enable_subscription(), fb(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access,
bool  restrict_logical_worker 
)
static

Definition at line 1675 of file pg_createsubscriber.c.

1677{
1679 int rc;
1680
1681 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1683 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1684
1685 /* Prevent unintended slot invalidation */
1686 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1687
1689 {
1690 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1691#if !defined(WIN32)
1692
1693 /*
1694 * An empty listen_addresses list means the server does not listen on
1695 * any IP interfaces; only Unix-domain sockets can be used to connect
1696 * to the server. Prevent external connections to minimize the chance
1697 * of failure.
1698 */
1699 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1700 if (opt->socket_dir)
1701 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1702 opt->socket_dir);
1704#endif
1705 }
1706 if (opt->config_file != NULL)
1707 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1708 opt->config_file);
1709
1710 /* Suppress to start logical replication if requested */
1712 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1713
1714 if (opt->log_dir)
1716
1717 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1718 rc = system(pg_ctl_cmd->data);
1719 pg_ctl_status(pg_ctl_cmd->data, rc);
1720 standby_running = true;
1722 pg_log_info("server was started");
1723}
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
void appendShellString(PQExpBuffer buf, const char *str)

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), destroyPQExpBuffer(), fb(), CreateSubscriberOptions::log_dir, logdir, pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, SERVER_LOG_FILE_NAME, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char datadir)
static

Definition at line 1726 of file pg_createsubscriber.c.

1727{
1728 char *pg_ctl_cmd;
1729 int rc;
1730
1731 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1732 datadir);
1733 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1734 rc = system(pg_ctl_cmd);
1736 standby_running = false;
1737 pg_log_info("server was stopped");
1738}

References datadir, fb(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, psprintf(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char pub_base_conninfo,
const char sub_base_conninfo 
)
static

Definition at line 514 of file pg_createsubscriber.c.

517{
518 struct LogicalRepInfo *dbinfo;
522 int i = 0;
523
524 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
525
526 if (num_pubs > 0)
527 pubcell = opt->pub_names.head;
528 if (num_subs > 0)
529 subcell = opt->sub_names.head;
530 if (num_replslots > 0)
532
533 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
534 {
535 char *conninfo;
536
537 /* Fill publisher attributes */
538 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
539 dbinfo[i].pubconninfo = conninfo;
540 dbinfo[i].dbname = cell->val;
541 if (num_pubs > 0)
542 dbinfo[i].pubname = pubcell->val;
543 else
544 dbinfo[i].pubname = NULL;
545 if (num_replslots > 0)
546 dbinfo[i].replslotname = replslotcell->val;
547 else
548 dbinfo[i].replslotname = NULL;
549 dbinfo[i].made_replslot = false;
550 dbinfo[i].made_publication = false;
551 /* Fill subscriber attributes */
552 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
553 dbinfo[i].subconninfo = conninfo;
554 if (num_subs > 0)
555 dbinfo[i].subname = subcell->val;
556 else
557 dbinfo[i].subname = NULL;
558 /* Other fields will be filled later */
559
560 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
561 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
562 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
563 dbinfo[i].pubconninfo);
564 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
565 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
566 dbinfo[i].subconninfo,
567 dbinfos.two_phase ? "true" : "false");
568
569 if (num_pubs > 0)
570 pubcell = pubcell->next;
571 if (num_subs > 0)
572 subcell = subcell->next;
573 if (num_replslots > 0)
575
576 i++;
577 }
578
579 return dbinfo;
580}
#define pg_malloc_array(type, count)
Definition fe_memutils.h:66
static bool two_phase

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfos, LogicalRepInfo::dbname, fb(), SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, and LogicalRepInfos::two_phase.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 284 of file pg_createsubscriber.c.

285{
286 printf(_("%s creates a new logical replica from a standby server.\n\n"),
287 progname);
288 printf(_("Usage:\n"));
289 printf(_(" %s [OPTION]...\n"), progname);
290 printf(_("\nOptions:\n"));
291 printf(_(" -a, --all create subscriptions for all databases except template\n"
292 " databases and databases that don't allow connections\n"));
293 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
294 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
295 printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
296 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
297 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
298 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
299 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
300 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
301 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
302 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
303 printf(_(" -v, --verbose output verbose messages\n"));
304 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
305 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
306 printf(_(" --config-file=FILENAME use specified main server configuration\n"
307 " file when running target cluster\n"));
308 printf(_(" --publication=NAME publication name\n"));
309 printf(_(" --replication-slot=NAME replication slot name\n"));
310 printf(_(" --subscription=NAME subscription name\n"));
311 printf(_(" -V, --version output version information, then exit\n"));
312 printf(_(" -?, --help show this help, then exit\n"));
313 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
314 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
315}
#define _(x)
Definition elog.c:96
#define printf(...)
Definition port.h:267

References _, DEFAULT_SUB_PORT, fb(), printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1750 of file pg_createsubscriber.c.

1751{
1752 PGconn *conn;
1753 bool ready = false;
1754 int timer = 0;
1755
1756 pg_log_info("waiting for the target server to reach the consistent state");
1757
1758 conn = connect_database(conninfo, true);
1759
1760 for (;;)
1761 {
1762 /* Did the recovery process finish? We're done if so. */
1764 {
1765 ready = true;
1766 recovery_ended = true;
1767 break;
1768 }
1769
1770 /* Bail out after recovery_timeout seconds if this option is set */
1771 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1772 {
1774 pg_log_error("recovery timed out");
1776 }
1777
1778 /* Keep waiting */
1781 }
1782
1783 disconnect_database(conn, false);
1784
1785 if (!ready)
1786 pg_fatal("server did not end recovery");
1787
1788 pg_log_info("target server reached the consistent state");
1789 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1790}
#define USECS_PER_SEC
Definition timestamp.h:134
#define pg_log_info_hint(...)
Definition logging.h:132
#define WAIT_INTERVAL
void pg_usleep(long microsec)
Definition signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, fb(), pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), recovery_ended, CreateSubscriberOptions::recovery_timeout, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USECS_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfos

◆ dry_run

◆ logdir

char* logdir = NULL
static

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 166 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 168 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 167 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 172 of file pg_createsubscriber.c.

Referenced by main(), main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 173 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

Definition at line 159 of file pg_createsubscriber.c.

Referenced by check_subscriber(), drop_primary_replication_slot(), and main().

◆ prng_state

pg_prng_state prng_state
static

Definition at line 170 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 157 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 182 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ recovery_params_set

bool recovery_params_set = false
static

Definition at line 184 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and setup_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

◆ success

bool success = false
static

Definition at line 162 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().