PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/wait.h>
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "datatype/timestamp.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "fe_utils/version.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 
struct  LogicalRepInfos
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define OBJECTTYPE_PUBLICATIONS   0x0001
 
#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"
 
#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"
 
#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"
 
#define SERVER_LOG_FILE_NAME   "pg_createsubscriber_server.log"
 
#define INTERNAL_LOG_FILE_NAME   "pg_createsubscriber_internal.log"
 
#define WAIT_INTERVAL   1 /* 1 second */
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage (void)
 
static charget_base_conninfo (const char *conninfo, char **dbname)
 
static charget_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static charget_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static charconcat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static chargenerate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static charsetup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
 
static charcreate_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static bool find_publication (PGconn *conn, const char *pubname, const char *dbname)
 
static void drop_publication (PGconn *conn, const char *pubname, const char *dbname)
 
static void check_and_drop_publications (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void drop_existing_subscription (PGconn *conn, const char *subname, const char *dbname)
 
static void get_publisher_databases (struct CreateSubscriberOptions *opt, bool dbnamespecified)
 
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
 
static void make_output_dirs (const char *log_basedir)
 
int main (int argc, char **argv)
 

Variables

static const charprogname
 
static charprimary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfos dbinfos
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static charpg_ctl_path = NULL
 
static charpg_resetwal_path = NULL
 
static charlogdir = NULL
 
static charsubscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 
static bool recovery_params_set = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 35 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE

#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"

Definition at line 50 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE_DISABLED

#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"

Definition at line 51 of file pg_createsubscriber.c.

◆ INTERNAL_LOG_FILE_NAME

#define INTERNAL_LOG_FILE_NAME   "pg_createsubscriber_internal.log"

Definition at line 54 of file pg_createsubscriber.c.

◆ OBJECTTYPE_PUBLICATIONS

#define OBJECTTYPE_PUBLICATIONS   0x0001

Definition at line 36 of file pg_createsubscriber.c.

◆ PG_AUTOCONF_FILENAME

#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"

Definition at line 49 of file pg_createsubscriber.c.

◆ SERVER_LOG_FILE_NAME

#define SERVER_LOG_FILE_NAME   "pg_createsubscriber_server.log"

Definition at line 53 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 155 of file pg_createsubscriber.c.

Function Documentation

◆ appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer  buf,
const char keyword,
const char val 
)
static

Definition at line 321 of file pg_createsubscriber.c.

322{
323 if (buf->len > 0)
325 appendPQExpBufferStr(buf, keyword);
328}
long val
Definition informix.c:689
static char buf[DEFAULT_XLOG_SEG_SIZE]
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void appendConnStrVal(PQExpBuffer buf, const char *str)

References appendConnStrVal(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and val.

Referenced by concat_conninfo_dbname(), get_base_conninfo(), and get_sub_conninfo().

◆ check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1304 of file pg_createsubscriber.c.

1306{
1308 char *dbname;
1309 PGresult *res;
1310
1311 Assert(conn != NULL);
1312
1313 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1314
1315 appendPQExpBuffer(query,
1316 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1317 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1318 "WHERE d.datname = %s",
1319 dbname);
1320 res = PQexec(conn, query->data);
1321
1322 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1323 {
1324 pg_log_error("could not obtain pre-existing subscriptions: %s",
1327 }
1328
1329 for (int i = 0; i < PQntuples(res); i++)
1331 dbinfo->dbname);
1332
1333 PQclear(res);
1334 destroyPQExpBuffer(query);
1336}
#define Assert(condition)
Definition c.h:943
void PQfreemem(void *ptr)
Definition fe-exec.c:4068
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4418
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
int i
Definition isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
#define PQclear
#define PQresultStatus
#define PQntuples
@ PGRES_TUPLES_OK
Definition libpq-fe.h:134
#define pg_log_error(...)
Definition logging.h:108
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static void disconnect_database(PGconn *conn, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
static int fb(int x)
char * dbname
Definition streamutil.c:49
PGconn * conn
Definition streamutil.c:52

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscription(), fb(), i, pg_log_error, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by setup_subscriber().

◆ check_and_drop_publications()

static void check_and_drop_publications ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1926 of file pg_createsubscriber.c.

1927{
1928 PGresult *res;
1930
1931 Assert(conn != NULL);
1932
1933 if (drop_all_pubs)
1934 {
1935 pg_log_info("dropping all existing publications in database \"%s\"",
1936 dbinfo->dbname);
1937
1938 /* Fetch all publication names */
1939 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1940 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1941 {
1942 pg_log_error("could not obtain publication information: %s",
1944 PQclear(res);
1946 }
1947
1948 /* Drop each publication */
1949 for (int i = 0; i < PQntuples(res); i++)
1950 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname);
1951
1952 PQclear(res);
1953 }
1954 else
1955 {
1956 /* Drop publication only if it was created by this tool */
1957 if (dbinfo->made_publication)
1958 {
1959 drop_publication(conn, dbinfo->pubname, dbinfo->dbname);
1960 }
1961 else
1962 {
1963 if (dry_run)
1964 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1965 dbinfo->pubname, dbinfo->dbname);
1966 else
1967 pg_log_info("preserving existing publication \"%s\" in database \"%s\"",
1968 dbinfo->pubname, dbinfo->dbname);
1969 }
1970 }
1971}
#define pg_log_info(...)
Definition logging.h:126
static struct LogicalRepInfos dbinfos
static bool dry_run
#define OBJECTTYPE_PUBLICATIONS
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname)

References Assert, conn, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), dry_run, fb(), i, LogicalRepInfo::made_publication, OBJECTTYPE_PUBLICATIONS, LogicalRepInfos::objecttypes_to_clean, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, and LogicalRepInfo::pubname.

Referenced by setup_subscriber().

◆ check_data_directory()

static void check_data_directory ( const char datadir)
static

Definition at line 449 of file pg_createsubscriber.c.

450{
451 struct stat statbuf;
452 uint32 major_version;
453 char *version_str;
454
455 pg_log_info("checking if directory \"%s\" is a cluster data directory",
456 datadir);
457
458 if (stat(datadir, &statbuf) != 0)
459 {
460 if (errno == ENOENT)
461 pg_fatal("data directory \"%s\" does not exist", datadir);
462 else
463 pg_fatal("could not access directory \"%s\": %m", datadir);
464 }
465
466 /*
467 * Retrieve the contents of this cluster's PG_VERSION. We require
468 * compatibility with the same major version as the one this tool is
469 * compiled with.
470 */
472 if (major_version != PG_MAJORVERSION_NUM)
473 {
474 pg_log_error("data directory is of wrong version");
475 pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
476 "PG_VERSION", version_str, PG_MAJORVERSION);
477 exit(1);
478 }
479}
uint32_t uint32
Definition c.h:624
uint32 get_pg_version(const char *datadir, char **version_str)
Definition version.c:44
#define pg_log_error_detail(...)
Definition logging.h:111
#define pg_fatal(...)
char * datadir
#define GET_PG_MAJORVERSION_NUM(v)
Definition version.h:19
#define stat
Definition win32_port.h:74

References datadir, fb(), GET_PG_MAJORVERSION_NUM, get_pg_version(), pg_fatal, pg_log_error, pg_log_error_detail, pg_log_info, and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1015 of file pg_createsubscriber.c.

1016{
1017 PGconn *conn;
1018 PGresult *res;
1019 bool failed = false;
1020
1021 char *wal_level;
1022 int max_repslots;
1023 int cur_repslots;
1024 int max_walsenders;
1025 int cur_walsenders;
1028
1029 pg_log_info("checking settings on publisher");
1030
1031 conn = connect_database(dbinfo[0].pubconninfo, true);
1032
1033 /*
1034 * If the primary server is in recovery (i.e. cascading replication),
1035 * objects (publication) cannot be created because it is read only.
1036 */
1038 {
1039 pg_log_error("primary server cannot be in recovery");
1041 }
1042
1043 /*------------------------------------------------------------------------
1044 * Logical replication requires a few parameters to be set on publisher.
1045 * Since these parameters are not a requirement for physical replication,
1046 * we should check it to make sure it won't fail.
1047 *
1048 * - wal_level >= replica
1049 * - max_replication_slots >= current + number of dbs to be converted
1050 * - max_wal_senders >= current + number of dbs to be converted
1051 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1052 * -----------------------------------------------------------------------
1053 */
1054 res = PQexec(conn,
1055 "SELECT pg_catalog.current_setting('wal_level'),"
1056 " pg_catalog.current_setting('max_replication_slots'),"
1057 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1058 " pg_catalog.current_setting('max_wal_senders'),"
1059 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1060 " pg_catalog.current_setting('max_prepared_transactions'),"
1061 " pg_catalog.current_setting('max_slot_wal_keep_size')");
1062
1063 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1064 {
1065 pg_log_error("could not obtain publisher settings: %s",
1068 }
1069
1070 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1071 max_repslots = atoi(PQgetvalue(res, 0, 1));
1072 cur_repslots = atoi(PQgetvalue(res, 0, 2));
1073 max_walsenders = atoi(PQgetvalue(res, 0, 3));
1074 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1077
1078 PQclear(res);
1079
1080 pg_log_debug("publisher: wal_level: %s", wal_level);
1081 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
1082 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
1083 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
1084 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
1085 pg_log_debug("publisher: max_prepared_transactions: %d",
1087 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
1089
1090 disconnect_database(conn, false);
1091
1092 if (strcmp(wal_level, "minimal") == 0)
1093 {
1094 pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
1095 failed = true;
1096 }
1097
1099 {
1100 pg_log_error("publisher requires %d replication slots, but only %d remain",
1102 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1103 "max_replication_slots", cur_repslots + num_dbs);
1104 failed = true;
1105 }
1106
1108 {
1109 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1111 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1112 "max_wal_senders", cur_walsenders + num_dbs);
1113 failed = true;
1114 }
1115
1117 {
1118 pg_log_warning("two_phase option will not be enabled for replication slots");
1119 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1120 "Prepared transactions will be replicated at COMMIT PREPARED.");
1121 pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1122 }
1123
1124 /*
1125 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1126 * is set to a non-default value, it may cause replication failures due to
1127 * required WAL files being prematurely removed.
1128 */
1129 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1130 {
1131 pg_log_warning("required WAL could be removed from the publisher");
1132 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1133 "max_slot_wal_keep_size");
1134 }
1135
1137
1138 if (failed)
1139 exit(1);
1140}
char * pg_strdup(const char *in)
Definition fe_memutils.c:91
void pg_free(void *ptr)
#define pg_log_error_hint(...)
Definition logging.h:114
#define pg_log_warning_hint(...)
Definition logging.h:123
#define pg_log_warning_detail(...)
Definition logging.h:120
#define pg_log_debug(...)
Definition logging.h:135
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
#define pg_log_warning(...)
Definition pgfnames.c:24
int wal_level
Definition xlog.c:138

References conn, connect_database(), dbinfos, disconnect_database(), dry_run, fb(), num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_log_warning, pg_log_warning_detail, pg_log_warning_hint, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, server_is_in_recovery(), LogicalRepInfos::two_phase, and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1154 of file pg_createsubscriber.c.

1155{
1156 PGconn *conn;
1157 PGresult *res;
1158 bool failed = false;
1159
1160 int max_lrworkers;
1161 int max_replorigins;
1162 int max_wprocs;
1163
1164 pg_log_info("checking settings on subscriber");
1165
1166 conn = connect_database(dbinfo[0].subconninfo, true);
1167
1168 /* The target server must be a standby */
1170 {
1171 pg_log_error("target server must be a standby");
1173 }
1174
1175 /*------------------------------------------------------------------------
1176 * Logical replication requires a few parameters to be set on subscriber.
1177 * Since these parameters are not a requirement for physical replication,
1178 * we should check it to make sure it won't fail.
1179 *
1180 * - max_active_replication_origins >= number of dbs to be converted
1181 * - max_logical_replication_workers >= number of dbs to be converted
1182 * - max_worker_processes >= 1 + number of dbs to be converted
1183 *------------------------------------------------------------------------
1184 */
1185 res = PQexec(conn,
1186 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1187 "'max_logical_replication_workers', "
1188 "'max_active_replication_origins', "
1189 "'max_worker_processes', "
1190 "'primary_slot_name') "
1191 "ORDER BY name");
1192
1193 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1194 {
1195 pg_log_error("could not obtain subscriber settings: %s",
1198 }
1199
1200 max_replorigins = atoi(PQgetvalue(res, 0, 0));
1201 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1202 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1203 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1205
1206 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1208 pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
1209 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1211 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1212
1213 PQclear(res);
1214
1215 disconnect_database(conn, false);
1216
1218 {
1219 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1221 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1222 "max_active_replication_origins", num_dbs);
1223 failed = true;
1224 }
1225
1226 if (max_lrworkers < num_dbs)
1227 {
1228 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1230 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1231 "max_logical_replication_workers", num_dbs);
1232 failed = true;
1233 }
1234
1235 if (max_wprocs < num_dbs + 1)
1236 {
1237 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1238 num_dbs + 1, max_wprocs);
1239 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1240 "max_worker_processes", num_dbs + 1);
1241 failed = true;
1242 }
1243
1244 if (failed)
1245 exit(1);
1246}
static char * primary_slot_name

References conn, connect_database(), disconnect_database(), fb(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, primary_slot_name, and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 201 of file pg_createsubscriber.c.

202{
203 /* Rename the included configuration file, if necessary. */
205 {
208
213
215 {
216 /* durable_rename() has already logged something. */
217 pg_log_warning_hint("A manual removal of the recovery parameters might be required.");
218 }
219 }
220
221 if (success)
222 return;
223
224 /*
225 * If the server is promoted, there is no way to use the current setup
226 * again. Warn the user that a new replication setup should be done before
227 * trying again.
228 */
229 if (recovery_ended)
230 {
231 pg_log_warning("failed after the end of recovery");
232 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
233 "You must recreate the physical replica before continuing.");
234 }
235
236 for (int i = 0; i < num_dbs; i++)
237 {
238 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
239
240 if (dbinfo->made_publication || dbinfo->made_replslot)
241 {
242 PGconn *conn;
243
244 conn = connect_database(dbinfo->pubconninfo, false);
245 if (conn != NULL)
246 {
247 if (dbinfo->made_publication)
248 drop_publication(conn, dbinfo->pubname, dbinfo->dbname);
249 if (dbinfo->made_replslot)
250 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
252 }
253 else
254 {
255 /*
256 * If a connection could not be established, inform the user
257 * that some objects were left on primary and should be
258 * removed before trying again.
259 */
260 if (dbinfo->made_publication)
261 {
262 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
263 dbinfo->pubname,
264 dbinfo->dbname);
265 pg_log_warning_hint("Drop this publication before trying again.");
266 }
267 if (dbinfo->made_replslot)
268 {
269 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
270 dbinfo->replslotname,
271 dbinfo->dbname);
272 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
273 }
274 }
275 }
276 }
277
278 if (standby_running)
280}
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
#define MAXPGPATH
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
#define INCLUDED_CONF_FILE
static bool success
static bool recovery_ended
#define INCLUDED_CONF_FILE_DISABLED
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static bool recovery_params_set
#define snprintf
Definition port.h:261
struct LogicalRepInfo * dbinfo

References conn, connect_database(), LogicalRepInfos::dbinfo, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), drop_replication_slot(), durable_rename(), fb(), i, INCLUDED_CONF_FILE, INCLUDED_CONF_FILE_DISABLED, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, MAXPGPATH, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, recovery_params_set, LogicalRepInfo::replslotname, snprintf, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char conninfo,
const char dbname 
)
static

Definition at line 489 of file pg_createsubscriber.c.

490{
492 char *ret;
493
494 Assert(conninfo != NULL);
495
496 appendPQExpBufferStr(buf, conninfo);
497 appendConnStrItem(buf, "dbname", dbname);
498
499 ret = pg_strdup(buf->data);
501
502 return ret;
503}
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert, buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), fb(), and pg_strdup().

Referenced by get_publisher_databases(), and store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char conninfo,
bool  exit_on_error 
)
static

Definition at line 586 of file pg_createsubscriber.c.

587{
588 PGconn *conn;
589 PGresult *res;
590
591 conn = PQconnectdb(conninfo);
593 {
594 pg_log_error("connection to database failed: %s",
596 PQfinish(conn);
597
598 if (exit_on_error)
599 exit(1);
600 return NULL;
601 }
602
603 /* Secure search_path */
606 {
607 pg_log_error("could not clear \"search_path\": %s",
609 PQclear(res);
610 PQfinish(conn);
611
612 if (exit_on_error)
613 exit(1);
614 return NULL;
615 }
616 PQclear(res);
617
618 return conn;
619}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition connect.h:25
PGconn * PQconnectdb(const char *conninfo)
Definition fe-connect.c:830
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
@ CONNECTION_OK
Definition libpq-fe.h:90

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, fb(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage, PQresultStatus, and PQstatus().

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), get_publisher_databases(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1546 of file pg_createsubscriber.c.

1547{
1549 PGresult *res = NULL;
1550 const char *slot_name = dbinfo->replslotname;
1551 char *slot_name_esc;
1552 char *lsn = NULL;
1553
1554 Assert(conn != NULL);
1555
1556 if (dry_run)
1557 pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1558 slot_name, dbinfo->dbname);
1559 else
1560 pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1561 slot_name, dbinfo->dbname);
1562
1563 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1564
1566 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1568 dbinfos.two_phase ? "true" : "false");
1569
1571
1572 pg_log_debug("command is: %s", str->data);
1573
1574 if (!dry_run)
1575 {
1576 res = PQexec(conn, str->data);
1577 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1578 {
1579 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1580 slot_name, dbinfo->dbname,
1582 PQclear(res);
1584 return NULL;
1585 }
1586
1587 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1588 PQclear(res);
1589 }
1590
1591 /* For cleanup purposes */
1592 dbinfo->made_replslot = true;
1593
1595
1596 return lsn;
1597}
const char * str

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, fb(), LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::replslotname, str, and LogicalRepInfos::two_phase.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1794 of file pg_createsubscriber.c.

1795{
1797 PGresult *res;
1798 char *ipubname_esc;
1799 char *spubname_esc;
1800
1801 Assert(conn != NULL);
1802
1804 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1805
1806 /* Check if the publication already exists */
1808 "SELECT 1 FROM pg_catalog.pg_publication "
1809 "WHERE pubname = %s",
1810 spubname_esc);
1811 res = PQexec(conn, str->data);
1812 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1813 {
1814 pg_log_error("could not obtain publication information: %s",
1817 }
1818
1819 if (PQntuples(res) == 1)
1820 {
1821 /*
1822 * Unfortunately, if it reaches this code path, it will always fail
1823 * (unless you decide to change the existing publication name). That's
1824 * bad but it is very unlikely that the user will choose a name with
1825 * pg_createsubscriber_ prefix followed by the exact database oid and
1826 * a random number.
1827 */
1828 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1829 pg_log_error_hint("Consider renaming this publication before continuing.");
1831 }
1832
1833 PQclear(res);
1835
1836 if (dry_run)
1837 pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1838 dbinfo->pubname, dbinfo->dbname);
1839 else
1840 pg_log_info("creating publication \"%s\" in database \"%s\"",
1841 dbinfo->pubname, dbinfo->dbname);
1842
1843 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1844 ipubname_esc);
1845
1846 pg_log_debug("command is: %s", str->data);
1847
1848 if (!dry_run)
1849 {
1850 res = PQexec(conn, str->data);
1851 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1852 {
1853 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1854 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1856 }
1857 PQclear(res);
1858 }
1859
1860 /* For cleanup purposes */
1861 dbinfo->made_publication = true;
1862
1866}
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4424
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
void resetPQExpBuffer(PQExpBuffer str)

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), LogicalRepInfo::made_publication, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubname, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1985 of file pg_createsubscriber.c.

1986{
1988 PGresult *res;
1989 char *pubname_esc;
1990 char *subname_esc;
1991 char *pubconninfo_esc;
1992 char *replslotname_esc;
1993
1994 Assert(conn != NULL);
1995
2000
2001 if (dry_run)
2002 pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
2003 dbinfo->subname, dbinfo->dbname);
2004 else
2005 pg_log_info("creating subscription \"%s\" in database \"%s\"",
2006 dbinfo->subname, dbinfo->dbname);
2007
2009 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2010 "WITH (create_slot = false, enabled = false, "
2011 "slot_name = %s, copy_data = false, two_phase = %s)",
2013 dbinfos.two_phase ? "true" : "false");
2014
2019
2020 pg_log_debug("command is: %s", str->data);
2021
2022 if (!dry_run)
2023 {
2024 res = PQexec(conn, str->data);
2025 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2026 {
2027 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
2028 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2030 }
2031 PQclear(res);
2032 }
2033
2035}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, str, LogicalRepInfo::subname, and LogicalRepInfos::two_phase.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_existing_subscription()

static void drop_existing_subscription ( PGconn conn,
const char subname,
const char dbname 
)
static

Definition at line 1255 of file pg_createsubscriber.c.

1256{
1258 PGresult *res;
1259 char *subname_esc;
1260
1261 Assert(conn != NULL);
1262
1264
1265 /*
1266 * Construct a query string. These commands are allowed to be executed
1267 * within a transaction.
1268 */
1269 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1270 subname_esc);
1271 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1272 subname_esc);
1273 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname_esc);
1274
1276
1277 if (dry_run)
1278 pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1279 subname, dbname);
1280 else
1281 {
1282 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1283 subname, dbname);
1284
1285 res = PQexec(conn, query->data);
1286
1287 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1288 {
1289 pg_log_error("could not drop subscription \"%s\": %s",
1292 }
1293
1294 PQclear(res);
1295 }
1296
1297 destroyPQExpBuffer(query);
1298}
NameData subname

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and subname.

Referenced by check_and_drop_existing_subscriptions().

◆ drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo dbinfo)
static

Definition at line 1504 of file pg_createsubscriber.c.

1505{
1506 PGconn *conn;
1507 PGresult *res;
1508
1509 conn = connect_database(dbinfo[0].subconninfo, false);
1510 if (conn != NULL)
1511 {
1512 /* Get failover replication slot names */
1513 res = PQexec(conn,
1514 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1515
1516 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1517 {
1518 /* Remove failover replication slots from subscriber */
1519 for (int i = 0; i < PQntuples(res); i++)
1520 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1521 }
1522 else
1523 {
1524 pg_log_warning("could not obtain failover replication slot information: %s",
1526 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1527 }
1528
1529 PQclear(res);
1530 disconnect_database(conn, false);
1531 }
1532 else
1533 {
1534 pg_log_warning("could not drop failover replication slot");
1535 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1536 }
1537}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), fb(), i, pg_log_warning, pg_log_warning_hint, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char slotname 
)
static

Definition at line 1474 of file pg_createsubscriber.c.

1475{
1476 PGconn *conn;
1477
1478 /* Replication slot does not exist, do nothing */
1479 if (!primary_slot_name)
1480 return;
1481
1482 conn = connect_database(dbinfo[0].pubconninfo, false);
1483 if (conn != NULL)
1484 {
1485 drop_replication_slot(conn, &dbinfo[0], slotname);
1486 disconnect_database(conn, false);
1487 }
1488 else
1489 {
1490 pg_log_warning("could not drop replication slot \"%s\" on primary",
1491 slotname);
1492 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1493 }
1494}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), fb(), pg_log_warning, pg_log_warning_hint, and primary_slot_name.

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
const char pubname,
const char dbname 
)
static

Definition at line 1872 of file pg_createsubscriber.c.

1873{
1875 PGresult *res;
1876 char *pubname_esc;
1877
1878 Assert(conn != NULL);
1879
1880 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1881
1882 if (dry_run)
1883 pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1884 pubname, dbname);
1885 else
1886 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1887 pubname, dbname);
1888
1889 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1890
1892
1893 pg_log_debug("command is: %s", str->data);
1894
1895 if (!dry_run)
1896 {
1897 res = PQexec(conn, str->data);
1898 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1899 {
1900 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1901 pubname, dbname, PQresultErrorMessage(res));
1902
1903 /*
1904 * Don't disconnect and exit here. This routine is used by primary
1905 * (cleanup publication / replication slot due to an error) and
1906 * subscriber (remove the replicated publications). In both cases,
1907 * it can continue and provide instructions for the user to remove
1908 * it later if cleanup fails.
1909 */
1910 }
1911 PQclear(res);
1912 }
1913
1915}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), dry_run, fb(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by check_and_drop_publications(), and cleanup_objects_atexit().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char slot_name 
)
static

Definition at line 1600 of file pg_createsubscriber.c.

1602{
1604 char *slot_name_esc;
1605 PGresult *res;
1606
1607 Assert(conn != NULL);
1608
1609 if (dry_run)
1610 pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1611 slot_name, dbinfo->dbname);
1612 else
1613 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1614 slot_name, dbinfo->dbname);
1615
1616 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1617
1618 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1619
1621
1622 pg_log_debug("command is: %s", str->data);
1623
1624 if (!dry_run)
1625 {
1626 res = PQexec(conn, str->data);
1627 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1628 {
1629 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1630 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1631 }
1632
1633 PQclear(res);
1634 }
1635
1637}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, fb(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 2143 of file pg_createsubscriber.c.

2144{
2146 PGresult *res;
2147 char *subname;
2148
2149 Assert(conn != NULL);
2150
2151 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2152
2153 if (dry_run)
2154 pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2155 dbinfo->subname, dbinfo->dbname);
2156 else
2157 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2158 dbinfo->subname, dbinfo->dbname);
2159
2160 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2161
2162 pg_log_debug("command is: %s", str->data);
2163
2164 if (!dry_run)
2165 {
2166 res = PQexec(conn, str->data);
2167 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2168 {
2169 pg_log_error("could not enable subscription \"%s\": %s",
2170 dbinfo->subname, PQresultErrorMessage(res));
2172 }
2173
2174 PQclear(res);
2175 }
2176
2179}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ find_publication()

static bool find_publication ( PGconn conn,
const char pubname,
const char dbname 
)
static

Definition at line 831 of file pg_createsubscriber.c.

832{
834 PGresult *res;
835 bool found = false;
836 char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
837
839 "SELECT 1 FROM pg_catalog.pg_publication "
840 "WHERE pubname = %s",
842 res = PQexec(conn, str->data);
844 {
845 pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
846 pubname, dbname, PQerrorMessage(conn));
848 }
849
850 if (PQntuples(res) == 1)
851 found = true;
852
853 PQclear(res);
856
857 return found;
858}

References appendPQExpBuffer(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), disconnect_database(), fb(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultStatus, and str.

Referenced by setup_publisher().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 785 of file pg_createsubscriber.c.

786{
787 PGresult *res;
788 Oid oid;
789 uint32 rand;
790 char *objname;
791
792 res = PQexec(conn,
793 "SELECT oid FROM pg_catalog.pg_database "
794 "WHERE datname = pg_catalog.current_database()");
796 {
797 pg_log_error("could not obtain database OID: %s",
800 }
801
802 if (PQntuples(res) != 1)
803 {
804 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
805 PQntuples(res), 1);
807 }
808
809 /* Database OID */
810 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
811
812 PQclear(res);
813
814 /* Random unsigned integer */
816
817 /*
818 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
819 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
820 * '\0').
821 */
822 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
823
824 return objname;
825}
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition pg_prng.c:227
unsigned int Oid
char * psprintf(const char *fmt,...)
Definition psprintf.c:43

References conn, disconnect_database(), fb(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, prng_state, and psprintf().

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char conninfo,
char **  dbname 
)
static

Definition at line 343 of file pg_createsubscriber.c.

344{
348 char *errmsg = NULL;
349 char *ret;
350
351 conn_opts = PQconninfoParse(conninfo, &errmsg);
352 if (conn_opts == NULL)
353 {
354 pg_log_error("could not parse connection string: %s", errmsg);
356 return NULL;
357 }
358
360 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
361 {
362 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
363 {
364 if (strcmp(conn_opt->keyword, "dbname") == 0)
365 {
366 if (dbname)
367 *dbname = pg_strdup(conn_opt->val);
368 continue;
369 }
370 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
371 }
372 }
373
374 ret = pg_strdup(buf->data);
375
378
379 return ret;
380}
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
static char * errmsg

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg, fb(), pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), and PQfreemem().

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char argv0,
const char progname 
)
static

Definition at line 413 of file pg_createsubscriber.c.

414{
415 char *versionstr;
416 char *exec_path;
417 int ret;
418
419 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
422
423 if (ret < 0)
424 {
425 char full_path[MAXPGPATH];
426
427 if (find_my_exec(argv0, full_path) < 0)
429
430 if (ret == -1)
431 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
432 progname, "pg_createsubscriber", full_path);
433 else
434 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
435 progname, full_path, "pg_createsubscriber");
436 }
437
438 pg_log_debug("%s path is: %s", progname, exec_path);
439
440 return exec_path;
441}
int find_my_exec(const char *argv0, char *retpath)
Definition exec.c:161
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition exec.c:311
void * pg_malloc(size_t size)
Definition fe_memutils.c:53
static const char * progname
static char * argv0
Definition pg_ctl.c:94
static char * exec_path
Definition pg_ctl.c:89
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45

References argv0, exec_path, fb(), find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char conninfo)
static

Definition at line 641 of file pg_createsubscriber.c.

642{
643 PGconn *conn;
644 PGresult *res;
646
647 pg_log_info("getting system identifier from publisher");
648
649 conn = connect_database(conninfo, true);
650
651 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
653 {
654 pg_log_error("could not get system identifier: %s",
657 }
658 if (PQntuples(res) != 1)
659 {
660 pg_log_error("could not get system identifier: got %d rows, expected %d row",
661 PQntuples(res), 1);
663 }
664
665 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
666
667 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
668
669 PQclear(res);
671
672 return sysid;
673}
uint64_t uint64
Definition c.h:625

References conn, connect_database(), disconnect_database(), fb(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ get_publisher_databases()

static void get_publisher_databases ( struct CreateSubscriberOptions opt,
bool  dbnamespecified 
)
static

Definition at line 2187 of file pg_createsubscriber.c.

2189{
2190 PGconn *conn;
2191 PGresult *res;
2192
2193 /* If a database name was specified, just connect to it. */
2194 if (dbnamespecified)
2196 else
2197 {
2198 /* Otherwise, try postgres first and then template1. */
2199 char *conninfo;
2200
2201 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2202 conn = connect_database(conninfo, false);
2203 pg_free(conninfo);
2204 if (!conn)
2205 {
2206 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2207 conn = connect_database(conninfo, true);
2208 pg_free(conninfo);
2209 }
2210 }
2211
2212 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2213 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2214 {
2215 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2216 PQclear(res);
2218 }
2219
2220 for (int i = 0; i < PQntuples(res); i++)
2221 {
2222 const char *dbname = PQgetvalue(res, i, 0);
2223
2225
2226 /* Increment num_dbs to reflect multiple --database options */
2227 num_dbs++;
2228 }
2229
2230 PQclear(res);
2231 disconnect_database(conn, false);
2232}
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition simple_list.c:63
SimpleStringList database_names

References concat_conninfo_dbname(), conn, connect_database(), CreateSubscriberOptions::database_names, dbname, disconnect_database(), fb(), i, num_dbs, pg_free(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, CreateSubscriberOptions::pub_conninfo_str, and simple_string_list_append().

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char datadir)
static

Definition at line 681 of file pg_createsubscriber.c.

682{
684 bool crc_ok;
686
687 pg_log_info("getting system identifier from subscriber");
688
690 if (!crc_ok)
691 pg_fatal("control file appears to be corrupt");
692
693 sysid = cf->system_identifier;
694
695 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
696
697 pg_free(cf);
698
699 return sysid;
700}
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)

References datadir, fb(), get_controlfile(), pg_fatal, pg_free(), and pg_log_info.

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 387 of file pg_createsubscriber.c.

388{
390 char *ret;
391
392 appendConnStrItem(buf, "port", opt->sub_port);
393#if !defined(WIN32)
394 appendConnStrItem(buf, "host", opt->socket_dir);
395#endif
396 if (opt->sub_username != NULL)
397 appendConnStrItem(buf, "user", opt->sub_username);
398 appendConnStrItem(buf, "fallback_application_name", progname);
399
400 ret = pg_strdup(buf->data);
401
403
404 return ret;
405}

References appendConnStrItem(), buf, createPQExpBuffer(), destroyPQExpBuffer(), fb(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2235 of file pg_createsubscriber.c.

2236{
2237 static struct option long_options[] =
2238 {
2239 {"all", no_argument, NULL, 'a'},
2240 {"database", required_argument, NULL, 'd'},
2241 {"pgdata", required_argument, NULL, 'D'},
2242 {"logdir", required_argument, NULL, 'l'},
2243 {"dry-run", no_argument, NULL, 'n'},
2244 {"subscriber-port", required_argument, NULL, 'p'},
2245 {"publisher-server", required_argument, NULL, 'P'},
2246 {"socketdir", required_argument, NULL, 's'},
2247 {"recovery-timeout", required_argument, NULL, 't'},
2248 {"enable-two-phase", no_argument, NULL, 'T'},
2249 {"subscriber-username", required_argument, NULL, 'U'},
2250 {"verbose", no_argument, NULL, 'v'},
2251 {"version", no_argument, NULL, 'V'},
2252 {"help", no_argument, NULL, '?'},
2253 {"config-file", required_argument, NULL, 1},
2254 {"publication", required_argument, NULL, 2},
2255 {"replication-slot", required_argument, NULL, 3},
2256 {"subscription", required_argument, NULL, 4},
2257 {"clean", required_argument, NULL, 5},
2258 {NULL, 0, NULL, 0}
2259 };
2260
2261 struct CreateSubscriberOptions opt = {0};
2262
2263 int c;
2264 int option_index;
2265
2266 char *pub_base_conninfo;
2267 char *sub_base_conninfo;
2268 char *dbname_conninfo = NULL;
2269
2272 struct stat statbuf;
2273
2274 char *consistent_lsn;
2275
2276 char pidfile[MAXPGPATH];
2277
2278 pg_logging_init(argv[0]);
2280 progname = get_progname(argv[0]);
2281 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2282
2283 if (argc > 1)
2284 {
2285 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2286 {
2287 usage();
2288 exit(0);
2289 }
2290 else if (strcmp(argv[1], "-V") == 0
2291 || strcmp(argv[1], "--version") == 0)
2292 {
2293 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2294 exit(0);
2295 }
2296 }
2297
2298 /* Default settings */
2300 opt.config_file = NULL;
2301 opt.log_dir = NULL;
2302 opt.pub_conninfo_str = NULL;
2303 opt.socket_dir = NULL;
2305 opt.sub_username = NULL;
2306 opt.two_phase = false;
2308 {
2309 0
2310 };
2311 opt.recovery_timeout = 0;
2312 opt.all_dbs = false;
2313
2314 /*
2315 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2316 * it either.
2317 */
2318#ifndef WIN32
2319 if (geteuid() == 0)
2320 {
2321 pg_log_error("cannot be executed by \"root\"");
2322 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2323 progname);
2324 exit(1);
2325 }
2326#endif
2327
2329
2330 while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
2331 long_options, &option_index)) != -1)
2332 {
2333 switch (c)
2334 {
2335 case 'a':
2336 opt.all_dbs = true;
2337 break;
2338 case 'd':
2340 {
2342 num_dbs++;
2343 }
2344 else
2345 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2346 break;
2347 case 'D':
2350 break;
2351 case 'l':
2352 opt.log_dir = pg_strdup(optarg);
2354 break;
2355 case 'n':
2356 dry_run = true;
2357 break;
2358 case 'p':
2359 opt.sub_port = pg_strdup(optarg);
2360 break;
2361 case 'P':
2363 break;
2364 case 's':
2367 break;
2368 case 't':
2370 break;
2371 case 'T':
2372 opt.two_phase = true;
2373 break;
2374 case 'U':
2376 break;
2377 case 'v':
2379 break;
2380 case 1:
2382 break;
2383 case 2:
2385 num_pubs++;
2386 break;
2387 case 3:
2389 {
2391 num_replslots++;
2392 }
2393 else
2394 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2395 break;
2396 case 4:
2398 {
2400 num_subs++;
2401 }
2402 else
2403 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2404 break;
2405 case 5:
2408 else
2409 pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2410 break;
2411 default:
2412 /* getopt_long already emitted a complaint */
2413 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2414 exit(1);
2415 }
2416 }
2417
2418 /* Validate that --all is not used with incompatible options */
2419 if (opt.all_dbs)
2420 {
2421 char *bad_switch = NULL;
2422
2423 if (num_dbs > 0)
2424 bad_switch = "--database";
2425 else if (num_pubs > 0)
2426 bad_switch = "--publication";
2427 else if (num_replslots > 0)
2428 bad_switch = "--replication-slot";
2429 else if (num_subs > 0)
2430 bad_switch = "--subscription";
2431
2432 if (bad_switch)
2433 {
2434 pg_log_error("options %s and %s cannot be used together",
2435 bad_switch, "-a/--all");
2436 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2437 exit(1);
2438 }
2439 }
2440
2441 /* Any non-option arguments? */
2442 if (optind < argc)
2443 {
2444 pg_log_error("too many command-line arguments (first is \"%s\")",
2445 argv[optind]);
2446 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2447 exit(1);
2448 }
2449
2450 /* Required arguments */
2451 if (subscriber_dir == NULL)
2452 {
2453 pg_log_error("no subscriber data directory specified");
2454 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2455 exit(1);
2456 }
2457
2458 /* If socket directory is not provided, use the current directory */
2459 if (opt.socket_dir == NULL)
2460 {
2461 char cwd[MAXPGPATH];
2462
2463 if (!getcwd(cwd, MAXPGPATH))
2464 pg_fatal("could not determine current directory");
2465 opt.socket_dir = pg_strdup(cwd);
2467 }
2468
2469 /*
2470 * Parse connection string. Build a base connection string that might be
2471 * reused by multiple databases.
2472 */
2473 if (opt.pub_conninfo_str == NULL)
2474 {
2475 /*
2476 * TODO use primary_conninfo (if available) from subscriber and
2477 * extract publisher connection string. Assume that there are
2478 * identical entries for physical and logical replication. If there is
2479 * not, we would fail anyway.
2480 */
2481 pg_log_error("no publisher connection string specified");
2482 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2483 exit(1);
2484 }
2485
2486 if (opt.log_dir != NULL)
2487 {
2488 char *internal_log_file;
2490
2492
2493 /*
2494 * Set mask based on PGDATA permissions, needed for the creation of
2495 * the output directories with correct permissions, similar with
2496 * pg_ctl and pg_upgrade.
2497 *
2498 * Don't error here if the data directory cannot be stat'd. Upcoming
2499 * checks for the data directory would raise the fatal error later.
2500 */
2503
2506
2509 pg_fatal("could not open log file \"%s\": %m", internal_log_file);
2510
2512
2514 }
2515
2516 if (dry_run)
2517 pg_log_info("Executing in dry-run mode.\n"
2518 "The target directory will not be modified.");
2519
2520 pg_log_info("validating publisher connection string");
2523 if (pub_base_conninfo == NULL)
2524 exit(1);
2525
2526 pg_log_info("validating subscriber connection string");
2528
2529 /*
2530 * Fetch all databases from the source (publisher) and treat them as if
2531 * the user specified has multiple --database options, one for each source
2532 * database.
2533 */
2534 if (opt.all_dbs)
2535 {
2537
2539 }
2540
2541 if (opt.database_names.head == NULL)
2542 {
2543 pg_log_info("no database was specified");
2544
2545 /*
2546 * Try to obtain the dbname from the publisher conninfo. If dbname
2547 * parameter is not available, error out.
2548 */
2549 if (dbname_conninfo)
2550 {
2552 num_dbs++;
2553
2554 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2556 }
2557 else
2558 {
2559 pg_log_error("no database name specified");
2560 pg_log_error_hint("Try \"%s --help\" for more information.",
2561 progname);
2562 exit(1);
2563 }
2564 }
2565
2566 /* Number of object names must match number of databases */
2567 if (num_pubs > 0 && num_pubs != num_dbs)
2568 {
2569 pg_log_error("wrong number of publication names specified");
2570 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2571 num_pubs, num_dbs);
2572 exit(1);
2573 }
2574 if (num_subs > 0 && num_subs != num_dbs)
2575 {
2576 pg_log_error("wrong number of subscription names specified");
2577 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2578 num_subs, num_dbs);
2579 exit(1);
2580 }
2581 if (num_replslots > 0 && num_replslots != num_dbs)
2582 {
2583 pg_log_error("wrong number of replication slot names specified");
2584 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2586 exit(1);
2587 }
2588
2589 /* Verify the object types specified for removal from the subscriber */
2590 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2591 {
2592 if (pg_strcasecmp(cell->val, "publications") == 0)
2594 else
2595 {
2596 pg_log_error("invalid object type \"%s\" specified for option %s",
2597 cell->val, "--clean");
2598 pg_log_error_hint("The valid value is: \"%s\"", "publications");
2599 exit(1);
2600 }
2601 }
2602
2603 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2604 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2605 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2606
2607 /* Rudimentary check for a data directory */
2609
2611
2612 /*
2613 * Store database information for publisher and subscriber. It should be
2614 * called before atexit() because its return is used in the
2615 * cleanup_objects_atexit().
2616 */
2618
2619 /* Register a function to clean up objects in case of failure */
2621
2622 /*
2623 * Check if the subscriber data directory has the same system identifier
2624 * than the publisher data directory.
2625 */
2628 if (pub_sysid != sub_sysid)
2629 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2630
2631 /* Subscriber PID file */
2632 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2633
2634 /*
2635 * The standby server must not be running. If the server is started under
2636 * service manager and pg_createsubscriber stops it, the service manager
2637 * might react to this action and start the server again. Therefore,
2638 * refuse to proceed if the server is running to avoid possible failures.
2639 */
2640 if (stat(pidfile, &statbuf) == 0)
2641 {
2642 pg_log_error("standby server is running");
2643 pg_log_error_hint("Stop the standby server and try again.");
2644 exit(1);
2645 }
2646
2647 /*
2648 * Start a short-lived standby server with temporary parameters (provided
2649 * by command-line options). The goal is to avoid connections during the
2650 * transformation steps.
2651 */
2652 pg_log_info("starting the standby server with command-line options");
2653 start_standby_server(&opt, true, false);
2654
2655 /* Check if the standby server is ready for logical replication */
2657
2658 /* Check if the primary server is ready for logical replication */
2660
2661 /*
2662 * Stop the target server. The recovery process requires that the server
2663 * reaches a consistent state before targeting the recovery stop point.
2664 * Make sure a consistent state is reached (stop the target server
2665 * guarantees it) *before* creating the replication slots in
2666 * setup_publisher().
2667 */
2668 pg_log_info("stopping the subscriber");
2670
2671 /* Create the required objects for each database on publisher */
2673
2674 /* Write the required recovery parameters */
2676
2677 /*
2678 * Start subscriber so the recovery parameters will take effect. Wait
2679 * until accepting connections. We don't want to start logical replication
2680 * during setup.
2681 */
2682 pg_log_info("starting the subscriber");
2683 start_standby_server(&opt, true, true);
2684
2685 /* Waiting the subscriber to be promoted */
2687
2688 /*
2689 * Create the subscription for each database on subscriber. It does not
2690 * enable it immediately because it needs to adjust the replication start
2691 * point to the LSN reported by setup_publisher(). It also cleans up
2692 * publications created by this tool and replication to the standby.
2693 */
2695
2696 /* Remove primary_slot_name if it exists on primary */
2698
2699 /* Remove failover replication slots if they exist on subscriber */
2701
2702 /* Stop the subscriber */
2703 pg_log_info("stopping the subscriber");
2705
2706 /* Change system identifier from subscriber */
2708
2709 success = true;
2710
2711 pg_log_info("Done!");
2712
2713 return 0;
2714}
#define PG_TEXTDOMAIN(domain)
Definition c.h:1303
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition exec.c:430
int pg_mode_mask
Definition file_perm.c:25
bool GetDataDirectoryCreatePerm(const char *dataDir)
#define PG_MODE_MASK_OWNER
Definition file_perm.h:24
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition getopt_long.c:60
#define no_argument
Definition getopt_long.h:25
#define required_argument
Definition getopt_long.h:26
void pg_logging_increase_verbosity(void)
Definition logging.c:187
void pg_logging_set_logfile(FILE *logfile)
Definition logging.c:210
void pg_logging_init(const char *argv0)
Definition logging.c:85
void pg_logging_set_level(enum pg_log_level new_level)
Definition logging.c:178
@ PG_LOG_WARNING
Definition logging.h:38
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
#define INTERNAL_LOG_FILE_NAME
static char * pg_ctl_path
static char * logdir
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static char * pg_resetwal_path
static void make_output_dirs(const char *log_basedir)
static void usage(void)
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition getopt.c:47
PGDLLIMPORT char * optarg
Definition getopt.c:49
int pg_strcasecmp(const char *s1, const char *s2)
void canonicalize_path(char *path)
Definition path.c:337
const char * get_progname(const char *argv0)
Definition path.c:669
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition simple_list.c:87
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
SimpleStringListCell * head
Definition simple_list.h:42

References CreateSubscriberOptions::all_dbs, canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, LogicalRepInfos::dbinfo, dbinfos, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, fb(), get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_publisher_databases(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), GetDataDirectoryCreatePerm(), getopt_long(), SimpleStringList::head, INTERNAL_LOG_FILE_NAME, CreateSubscriberOptions::log_dir, logdir, make_output_dirs(), MAXPGPATH, modify_subscriber_sysid(), no_argument, num_dbs, num_pubs, num_replslots, num_subs, OBJECTTYPE_PUBLICATIONS, CreateSubscriberOptions::objecttypes_to_clean, LogicalRepInfos::objecttypes_to_clean, optarg, optind, pg_ctl_path, pg_fatal, pg_free(), pg_log_error, pg_log_error_detail, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_logging_set_logfile(), pg_mode_mask, PG_MODE_MASK_OWNER, pg_resetwal_path, pg_strcasecmp(), pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, psprintf(), CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, LogicalRepInfo::subconninfo, subscriber_dir, success, CreateSubscriberOptions::two_phase, LogicalRepInfos::two_phase, usage(), and wait_for_end_recovery().

◆ make_output_dirs()

static void make_output_dirs ( const char log_basedir)
static

Definition at line 978 of file pg_createsubscriber.c.

979{
980 char timestamp[128];
981 struct timeval tval;
982 time_t now;
983 struct tm tmbuf;
984
985 /* Generate timestamp */
987 now = tval.tv_sec;
988
989 strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
990 localtime_r(&now, &tmbuf));
991
992 /* Append milliseconds */
994 sizeof(timestamp) - strlen(timestamp), ".%03u",
995 (unsigned int) (tval.tv_usec / 1000));
996
997 /* Build timestamp directory path */
999
1000 /* Create base directory (ignore if exists) */
1002 pg_fatal("could not create directory \"%s\": %m", log_basedir);
1003
1004 /* Create a timestamp-named subdirectory under the base directory */
1006 pg_fatal("could not create directory \"%s\": %m", logdir);
1007}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
int pg_dir_create_mode
Definition file_perm.c:18
static struct pg_tm tm
Definition localtime.c:104
int64 timestamp
#define mkdir(a, b)
Definition win32_port.h:80
int gettimeofday(struct timeval *tp, void *tzp)

References fb(), gettimeofday(), logdir, mkdir, now(), pg_dir_create_mode, pg_fatal, psprintf(), snprintf, and tm.

Referenced by main().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 708 of file pg_createsubscriber.c.

709{
711 bool crc_ok;
712 struct timeval tv;
713
714 char *out_file;
715 char *cmd_str;
716
717 pg_log_info("modifying system identifier of subscriber");
718
720 if (!crc_ok)
721 pg_fatal("control file appears to be corrupt");
722
723 /*
724 * Select a new system identifier.
725 *
726 * XXX this code was extracted from BootStrapXLOG().
727 */
728 gettimeofday(&tv, NULL);
729 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
730 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
731 cf->system_identifier |= getpid() & 0xFFF;
732
733 if (dry_run)
734 pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
735 cf->system_identifier);
736 else
737 {
739 pg_log_info("system identifier is %" PRIu64 " on subscriber",
740 cf->system_identifier);
741 }
742
743 if (dry_run)
744 pg_log_info("dry-run: would run pg_resetwal on the subscriber");
745 else
746 pg_log_info("running pg_resetwal on the subscriber");
747
748 /*
749 * Redirecting the output to the logfile if specified. Since the output
750 * would be very short, around one line, we do not provide a separate file
751 * for it; it's done as a part of the server log.
752 */
753 if (opt->log_dir)
755 else
757
758 cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
760 if (opt->log_dir)
762
763 pg_log_debug("pg_resetwal command is: %s", cmd_str);
764
765 if (!dry_run)
766 {
767 int rc = system(cmd_str);
768
769 if (rc == 0)
770 pg_log_info("successfully reset WAL on the subscriber");
771 else
772 pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
773 }
774
775 pg_free(cf);
777}
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define SERVER_LOG_FILE_NAME
#define DEVNULL
Definition port.h:162
char * wait_result_to_str(int exitstatus)
Definition wait_error.c:33

References DEVNULL, dry_run, fb(), get_controlfile(), gettimeofday(), CreateSubscriberOptions::log_dir, logdir, pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), SERVER_LOG_FILE_NAME, subscriber_dir, update_controlfile(), and wait_result_to_str().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char pg_ctl_cmd,
int  rc 
)
static

Definition at line 1643 of file pg_createsubscriber.c.

1644{
1645 if (rc != 0)
1646 {
1647 if (WIFEXITED(rc))
1648 {
1649 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1650 }
1651 else if (WIFSIGNALED(rc))
1652 {
1653#if defined(WIN32)
1654 pg_log_error("pg_ctl was terminated by exception 0x%X",
1655 WTERMSIG(rc));
1656 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1657#else
1658 pg_log_error("pg_ctl was terminated by signal %d: %s",
1659 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1660#endif
1661 }
1662 else
1663 {
1664 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1665 }
1666
1667 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1668 exit(1);
1669 }
1670}
const char * pg_strsignal(int signum)
Definition pgstrsignal.c:39
#define WIFEXITED(w)
Definition win32_port.h:150
#define WIFSIGNALED(w)
Definition win32_port.h:151
#define WTERMSIG(w)
Definition win32_port.h:153
#define WEXITSTATUS(w)
Definition win32_port.h:152

References fb(), pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 955 of file pg_createsubscriber.c.

956{
957 PGresult *res;
958 int ret;
959
960 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
961
963 {
964 pg_log_error("could not obtain recovery progress: %s",
967 }
968
969
970 ret = strcmp("t", PQgetvalue(res, 0, 0));
971
972 PQclear(res);
973
974 return ret == 0;
975}

References conn, disconnect_database(), fb(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, and PQresultStatus.

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char lsn 
)
static

Definition at line 2048 of file pg_createsubscriber.c.

2049{
2051 PGresult *res;
2052 Oid suboid;
2053 char *subname;
2054 char *dbname;
2055 char *originname;
2056 char *lsnstr;
2057
2058 Assert(conn != NULL);
2059
2060 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2061 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2062
2064 "SELECT s.oid FROM pg_catalog.pg_subscription s "
2065 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2066 "WHERE s.subname = %s AND d.datname = %s",
2067 subname, dbname);
2068
2069 res = PQexec(conn, str->data);
2070 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2071 {
2072 pg_log_error("could not obtain subscription OID: %s",
2075 }
2076
2077 if (PQntuples(res) != 1 && !dry_run)
2078 {
2079 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
2080 PQntuples(res), 1);
2082 }
2083
2084 if (dry_run)
2085 {
2088 }
2089 else
2090 {
2091 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2092 lsnstr = psprintf("%s", lsn);
2093 }
2094
2095 PQclear(res);
2096
2097 /*
2098 * The origin name is defined as pg_%u. %u is the subscription OID. See
2099 * ApplyWorkerMain().
2100 */
2101 originname = psprintf("pg_%u", suboid);
2102
2103 if (dry_run)
2104 pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2105 originname, lsnstr, dbinfo->dbname);
2106 else
2107 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2108 originname, lsnstr, dbinfo->dbname);
2109
2112 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2114
2115 pg_log_debug("command is: %s", str->data);
2116
2117 if (!dry_run)
2118 {
2119 res = PQexec(conn, str->data);
2120 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2121 {
2122 pg_log_error("could not set replication progress for subscription \"%s\": %s",
2123 dbinfo->subname, PQresultErrorMessage(res));
2125 }
2126 PQclear(res);
2127 }
2128
2132 pg_free(lsnstr);
2134}
#define InvalidOid
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, psprintf(), resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 867 of file pg_createsubscriber.c.

868{
869 char *lsn = NULL;
870
871 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
872
873 for (int i = 0; i < num_dbs; i++)
874 {
875 PGconn *conn;
876 char *genname = NULL;
877
878 conn = connect_database(dbinfo[i].pubconninfo, true);
879
880 /*
881 * If an object name was not specified as command-line options, assign
882 * a generated object name. The replication slot has a different rule.
883 * The subscription name is assigned to the replication slot name if
884 * no replication slot is specified. It follows the same rule as
885 * CREATE SUBSCRIPTION.
886 */
887 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
889 if (num_pubs == 0)
890 dbinfo[i].pubname = pg_strdup(genname);
891 if (num_subs == 0)
892 dbinfo[i].subname = pg_strdup(genname);
893 if (num_replslots == 0)
894 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
895
896 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
897 {
898 /* Reuse existing publication on publisher. */
899 pg_log_info("using existing publication \"%s\" in database \"%s\"",
900 dbinfo[i].pubname, dbinfo[i].dbname);
901 /* Don't remove pre-existing publication if an error occurs. */
902 dbinfo[i].made_publication = false;
903 }
904 else
905 {
906 /*
907 * Create publication on publisher. This step should be executed
908 * *before* promoting the subscriber to avoid any transactions
909 * between consistent LSN and the new publication rows (such
910 * transactions wouldn't see the new publication rows resulting in
911 * an error).
912 */
913 create_publication(conn, &dbinfo[i]);
914 }
915
916 /* Create replication slot on publisher */
917 if (lsn)
918 pg_free(lsn);
919 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
920 if (lsn == NULL && !dry_run)
921 exit(1);
922
923 /*
924 * Since we are using the LSN returned by the last replication slot as
925 * recovery_target_lsn, this LSN is ahead of the current WAL position
926 * and the recovery waits until the publisher writes a WAL record to
927 * reach the target and ends the recovery. On idle systems, this wait
928 * time is unpredictable and could lead to failure in promoting the
929 * subscriber. To avoid that, insert a harmless WAL record.
930 */
931 if (i == num_dbs - 1 && !dry_run)
932 {
933 PGresult *res;
934
935 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
937 {
938 pg_log_error("could not write an additional WAL record: %s",
941 }
942 PQclear(res);
943 }
944
946 }
947
948 return lsn;
949}
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), dbname, disconnect_database(), dry_run, fb(), find_publication(), generate_object_name(), i, LogicalRepInfo::made_publication, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_error, pg_log_info, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char datadir,
const char lsn 
)
static

Definition at line 1380 of file pg_createsubscriber.c.

1381{
1382 PGconn *conn;
1384
1385 /*
1386 * Despite of the recovery parameters will be written to the subscriber,
1387 * use a publisher connection. The primary_conninfo is generated using the
1388 * connection settings.
1389 */
1390 conn = connect_database(dbinfo[0].pubconninfo, true);
1391
1392 /*
1393 * Write recovery parameters.
1394 *
1395 * The subscriber is not running yet. In dry run mode, the recovery
1396 * parameters *won't* be written. An invalid LSN is used for printing
1397 * purposes. Additional recovery parameters are added here. It avoids
1398 * unexpected behavior such as end of recovery as soon as a consistent
1399 * state is reached (recovery_target) and failure due to multiple recovery
1400 * targets (name, time, xid, LSN).
1401 */
1403 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1405 "recovery_target_timeline = 'latest'\n");
1406
1407 /*
1408 * Set recovery_target_inclusive = false to avoid reapplying the
1409 * transaction committed at 'lsn' after subscription is enabled. This is
1410 * because the provided 'lsn' is also used as the replication start point
1411 * for the subscription. So, the server can send the transaction committed
1412 * at that 'lsn' after replication is started which can lead to applying
1413 * the same transaction twice if we keep recovery_target_inclusive = true.
1414 */
1416 "recovery_target_inclusive = false\n");
1418 "recovery_target_action = promote\n");
1419 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1420 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1421 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1422
1423 if (dry_run)
1424 {
1425 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1427 "recovery_target_lsn = '%X/%08X'\n",
1429 }
1430 else
1431 {
1432 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1433 lsn);
1434 }
1435
1436 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1437
1438 if (!dry_run)
1439 {
1441 FILE *fd;
1442
1443 /* Write the recovery parameters to INCLUDED_CONF_FILE */
1446 fd = fopen(conf_filename, "w");
1447 if (fd == NULL)
1448 pg_fatal("could not open file \"%s\": %m", conf_filename);
1449
1451 pg_fatal("could not write to file \"%s\": %m", conf_filename);
1452
1453 fclose(fd);
1454 recovery_params_set = true;
1455
1456 /* Include conditionally the recovery parameters. */
1459 "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1461 }
1462
1463 disconnect_database(conn, false);
1464}
static PQExpBuffer recoveryconfcontents
static int fd(const char *x, int i)
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)

References appendPQExpBuffer(), appendPQExpBufferStr(), conn, connect_database(), PQExpBufferData::data, datadir, disconnect_database(), dry_run, fb(), fd(), GenerateRecoveryConfig(), INCLUDED_CONF_FILE, InvalidXLogRecPtr, PQExpBufferData::len, LSN_FORMAT_ARGS, MAXPGPATH, pg_fatal, pg_log_debug, recovery_params_set, recoveryconfcontents, resetPQExpBuffer(), snprintf, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char consistent_lsn 
)
static

Definition at line 1344 of file pg_createsubscriber.c.

1345{
1346 for (int i = 0; i < num_dbs; i++)
1347 {
1348 PGconn *conn;
1349
1350 /* Connect to subscriber. */
1351 conn = connect_database(dbinfo[i].subconninfo, true);
1352
1353 /*
1354 * We don't need the pre-existing subscriptions on the newly formed
1355 * subscriber. They can connect to other publisher nodes and either
1356 * get some unwarranted data or can lead to ERRORs in connecting to
1357 * such nodes.
1358 */
1360
1361 /* Check and drop the required publications in the given database. */
1363
1364 create_subscription(conn, &dbinfo[i]);
1365
1366 /* Set the replication progress to the correct LSN */
1368
1369 /* Enable subscription */
1370 enable_subscription(conn, &dbinfo[i]);
1371
1372 disconnect_database(conn, false);
1373 }
1374}
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), check_and_drop_publications(), conn, connect_database(), create_subscription(), disconnect_database(), enable_subscription(), fb(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access,
bool  restrict_logical_worker 
)
static

Definition at line 1673 of file pg_createsubscriber.c.

1675{
1677 int rc;
1678
1679 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1681 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1682
1683 /* Prevent unintended slot invalidation */
1684 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1685
1687 {
1688 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1689#if !defined(WIN32)
1690
1691 /*
1692 * An empty listen_addresses list means the server does not listen on
1693 * any IP interfaces; only Unix-domain sockets can be used to connect
1694 * to the server. Prevent external connections to minimize the chance
1695 * of failure.
1696 */
1697 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1698 if (opt->socket_dir)
1699 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1700 opt->socket_dir);
1702#endif
1703 }
1704 if (opt->config_file != NULL)
1705 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1706 opt->config_file);
1707
1708 /* Suppress to start logical replication if requested */
1710 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1711
1712 if (opt->log_dir)
1714
1715 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1716 rc = system(pg_ctl_cmd->data);
1717 pg_ctl_status(pg_ctl_cmd->data, rc);
1718 standby_running = true;
1720 pg_log_info("server was started");
1721}
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
void appendShellString(PQExpBuffer buf, const char *str)

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), destroyPQExpBuffer(), fb(), CreateSubscriberOptions::log_dir, logdir, pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, SERVER_LOG_FILE_NAME, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char datadir)
static

Definition at line 1724 of file pg_createsubscriber.c.

1725{
1726 char *pg_ctl_cmd;
1727 int rc;
1728
1729 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1730 datadir);
1731 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1732 rc = system(pg_ctl_cmd);
1734 standby_running = false;
1735 pg_log_info("server was stopped");
1736}

References datadir, fb(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, psprintf(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char pub_base_conninfo,
const char sub_base_conninfo 
)
static

Definition at line 513 of file pg_createsubscriber.c.

516{
517 struct LogicalRepInfo *dbinfo;
521 int i = 0;
522
523 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
524
525 if (num_pubs > 0)
526 pubcell = opt->pub_names.head;
527 if (num_subs > 0)
528 subcell = opt->sub_names.head;
529 if (num_replslots > 0)
531
532 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
533 {
534 char *conninfo;
535
536 /* Fill publisher attributes */
537 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
538 dbinfo[i].pubconninfo = conninfo;
539 dbinfo[i].dbname = cell->val;
540 if (num_pubs > 0)
541 dbinfo[i].pubname = pubcell->val;
542 else
543 dbinfo[i].pubname = NULL;
544 if (num_replslots > 0)
545 dbinfo[i].replslotname = replslotcell->val;
546 else
547 dbinfo[i].replslotname = NULL;
548 dbinfo[i].made_replslot = false;
549 dbinfo[i].made_publication = false;
550 /* Fill subscriber attributes */
551 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
552 dbinfo[i].subconninfo = conninfo;
553 if (num_subs > 0)
554 dbinfo[i].subname = subcell->val;
555 else
556 dbinfo[i].subname = NULL;
557 /* Other fields will be filled later */
558
559 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
560 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
561 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
562 dbinfo[i].pubconninfo);
563 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
564 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
565 dbinfo[i].subconninfo,
566 dbinfos.two_phase ? "true" : "false");
567
568 if (num_pubs > 0)
569 pubcell = pubcell->next;
570 if (num_subs > 0)
571 subcell = subcell->next;
572 if (num_replslots > 0)
574
575 i++;
576 }
577
578 return dbinfo;
579}
#define pg_malloc_array(type, count)
Definition fe_memutils.h:66
static bool two_phase

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfos, LogicalRepInfo::dbname, fb(), SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, and LogicalRepInfos::two_phase.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 283 of file pg_createsubscriber.c.

284{
285 printf(_("%s creates a new logical replica from a standby server.\n\n"),
286 progname);
287 printf(_("Usage:\n"));
288 printf(_(" %s [OPTION]...\n"), progname);
289 printf(_("\nOptions:\n"));
290 printf(_(" -a, --all create subscriptions for all databases except template\n"
291 " databases and databases that don't allow connections\n"));
292 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
293 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
294 printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
295 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
296 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
297 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
298 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
299 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
300 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
301 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
302 printf(_(" -v, --verbose output verbose messages\n"));
303 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
304 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
305 printf(_(" --config-file=FILENAME use specified main server configuration\n"
306 " file when running target cluster\n"));
307 printf(_(" --publication=NAME publication name\n"));
308 printf(_(" --replication-slot=NAME replication slot name\n"));
309 printf(_(" --subscription=NAME subscription name\n"));
310 printf(_(" -V, --version output version information, then exit\n"));
311 printf(_(" -?, --help show this help, then exit\n"));
312 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
313 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
314}
#define _(x)
Definition elog.c:96
#define printf(...)
Definition port.h:267

References _, DEFAULT_SUB_PORT, fb(), printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1748 of file pg_createsubscriber.c.

1749{
1750 PGconn *conn;
1751 bool ready = false;
1752 int timer = 0;
1753
1754 pg_log_info("waiting for the target server to reach the consistent state");
1755
1756 conn = connect_database(conninfo, true);
1757
1758 for (;;)
1759 {
1760 /* Did the recovery process finish? We're done if so. */
1762 {
1763 ready = true;
1764 recovery_ended = true;
1765 break;
1766 }
1767
1768 /* Bail out after recovery_timeout seconds if this option is set */
1769 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1770 {
1772 pg_log_error("recovery timed out");
1774 }
1775
1776 /* Keep waiting */
1779 }
1780
1781 disconnect_database(conn, false);
1782
1783 if (!ready)
1784 pg_fatal("server did not end recovery");
1785
1786 pg_log_info("target server reached the consistent state");
1787 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1788}
#define USECS_PER_SEC
Definition timestamp.h:134
#define pg_log_info_hint(...)
Definition logging.h:132
#define WAIT_INTERVAL
void pg_usleep(long microsec)
Definition signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, fb(), pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), recovery_ended, CreateSubscriberOptions::recovery_timeout, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USECS_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfos

◆ dry_run

◆ logdir

char* logdir = NULL
static

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 166 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 168 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 167 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 172 of file pg_createsubscriber.c.

Referenced by main(), main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 173 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

Definition at line 159 of file pg_createsubscriber.c.

Referenced by check_subscriber(), drop_primary_replication_slot(), and main().

◆ prng_state

pg_prng_state prng_state
static

Definition at line 170 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 157 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 182 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ recovery_params_set

bool recovery_params_set = false
static

Definition at line 184 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and setup_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

◆ success

bool success = false
static

Definition at line 162 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().