PostgreSQL Source Code git master
Loading...
Searching...
No Matches
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/wait.h>
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "datatype/timestamp.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "fe_utils/version.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 
struct  LogicalRepInfos
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define OBJECTTYPE_PUBLICATIONS   0x0001
 
#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"
 
#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"
 
#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"
 
#define SERVER_LOG_FILE_NAME   "pg_createsubscriber_server.log"
 
#define INTERNAL_LOG_FILE_NAME   "pg_createsubscriber_internal.log"
 
#define WAIT_INTERVAL   1 /* 1 second */
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage (void)
 
static charget_base_conninfo (const char *conninfo, char **dbname)
 
static charget_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static charget_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static charconcat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static chargenerate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static charsetup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
 
static charcreate_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static bool find_publication (PGconn *conn, const char *pubname, const char *dbname)
 
static void drop_publication (PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
 
static void check_and_drop_publications (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void drop_existing_subscription (PGconn *conn, const char *subname, const char *dbname)
 
static void get_publisher_databases (struct CreateSubscriberOptions *opt, bool dbnamespecified)
 
static void report_createsub_log (enum pg_log_level, enum pg_log_part, const char *pg_restrict fmt,...) pg_attribute_printf(3
 
static void static void report_createsub_log_v (enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list args) pg_attribute_printf(3
 
static void static void static pg_noreturn void report_createsub_fatal (const char *pg_restrict fmt,...) pg_attribute_printf(1
 
static void static void static pg_noreturn void static void internal_log_file_write (enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list args) pg_attribute_printf(3
 
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
 
static FILElogfile_open (const char *filename, const char *mode)
 
static void make_output_dirs (const char *log_basedir)
 
int main (int argc, char **argv)
 

Variables

static const charprogname
 
static charprimary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfos dbinfos
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static charpg_ctl_path = NULL
 
static charpg_resetwal_path = NULL
 
static FILEinternal_log_file_fp = NULL
 
static char logdir [MAXPGPATH]
 
static charsubscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 
static bool recovery_params_set = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 35 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE

#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"

Definition at line 50 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE_DISABLED

#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"

Definition at line 51 of file pg_createsubscriber.c.

◆ INTERNAL_LOG_FILE_NAME

#define INTERNAL_LOG_FILE_NAME   "pg_createsubscriber_internal.log"

Definition at line 54 of file pg_createsubscriber.c.

◆ OBJECTTYPE_PUBLICATIONS

#define OBJECTTYPE_PUBLICATIONS   0x0001

Definition at line 36 of file pg_createsubscriber.c.

◆ PG_AUTOCONF_FILENAME

#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"

Definition at line 49 of file pg_createsubscriber.c.

◆ SERVER_LOG_FILE_NAME

#define SERVER_LOG_FILE_NAME   "pg_createsubscriber_server.log"

Definition at line 53 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 167 of file pg_createsubscriber.c.

Function Documentation

◆ appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer  buf,
const char keyword,
const char val 
)
static

Definition at line 397 of file pg_createsubscriber.c.

398{
399 if (buf->len > 0)
401 appendPQExpBufferStr(buf, keyword);
404}
long val
Definition informix.c:689
static char buf[DEFAULT_XLOG_SEG_SIZE]
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void appendConnStrVal(PQExpBuffer buf, const char *str)

References appendConnStrVal(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and val.

Referenced by concat_conninfo_dbname(), get_base_conninfo(), and get_sub_conninfo().

◆ check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1500 of file pg_createsubscriber.c.

1502{
1504 char *dbname;
1505 PGresult *res;
1506
1507 Assert(conn != NULL);
1508
1509 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1510
1511 appendPQExpBuffer(query,
1512 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1513 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1514 "WHERE d.datname = %s",
1515 dbname);
1516 res = PQexec(conn, query->data);
1517
1518 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1519 {
1521 "could not obtain pre-existing subscriptions: %s",
1524 }
1525
1526 for (int i = 0; i < PQntuples(res); i++)
1528 dbinfo->dbname);
1529
1530 PQclear(res);
1531 destroyPQExpBuffer(query);
1533}
#define Assert(condition)
Definition c.h:943
void PQfreemem(void *ptr)
Definition fe-exec.c:4049
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4399
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
int i
Definition isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
#define PQclear
#define PQresultStatus
#define PQntuples
@ PGRES_TUPLES_OK
Definition libpq-fe.h:134
@ PG_LOG_PRIMARY
Definition logging.h:67
@ PG_LOG_ERROR
Definition logging.h:43
static void report_createsub_log(enum pg_log_level, enum pg_log_part, const char *pg_restrict fmt,...) pg_attribute_printf(3
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static void disconnect_database(PGconn *conn, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
static int fb(int x)
char * dbname
Definition streamutil.c:49
PGconn * conn
Definition streamutil.c:52

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscription(), fb(), i, PG_LOG_ERROR, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, and report_createsub_log().

Referenced by setup_subscriber().

◆ check_and_drop_publications()

static void check_and_drop_publications ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 2167 of file pg_createsubscriber.c.

2168{
2169 PGresult *res;
2171
2172 Assert(conn != NULL);
2173
2174 if (drop_all_pubs)
2175 {
2177 "dropping all existing publications in database \"%s\"",
2178 dbinfo->dbname);
2179
2180 /* Fetch all publication names */
2181 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
2182 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2183 {
2185 "could not obtain publication information: %s",
2187 PQclear(res);
2189 }
2190
2191 /* Drop each publication */
2192 for (int i = 0; i < PQntuples(res); i++)
2193 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
2194 &dbinfo->made_publication);
2195
2196 PQclear(res);
2197 }
2198 else
2199 {
2200 /* Drop publication only if it was created by this tool */
2201 if (dbinfo->made_publication)
2202 {
2203 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
2204 &dbinfo->made_publication);
2205 }
2206 else
2207 {
2208 if (dry_run)
2210 "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
2211 dbinfo->pubname, dbinfo->dbname);
2212 else
2214 "preserve existing publication \"%s\" in database \"%s\"",
2215 dbinfo->pubname, dbinfo->dbname);
2216 }
2217 }
2218}
@ PG_LOG_INFO
Definition logging.h:33
static struct LogicalRepInfos dbinfos
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static bool dry_run
#define OBJECTTYPE_PUBLICATIONS

References Assert, conn, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), dry_run, fb(), i, LogicalRepInfo::made_publication, OBJECTTYPE_PUBLICATIONS, LogicalRepInfos::objecttypes_to_clean, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubname, and report_createsub_log().

Referenced by setup_subscriber().

◆ check_data_directory()

static void check_data_directory ( const char datadir)
static

Definition at line 527 of file pg_createsubscriber.c.

528{
529 struct stat statbuf;
530 uint32 major_version;
531 char *version_str;
532
534 "checking if directory \"%s\" is a cluster data directory",
535 datadir);
536
537 if (stat(datadir, &statbuf) != 0)
538 {
539 if (errno == ENOENT)
540 report_createsub_fatal("data directory \"%s\" does not exist", datadir);
541 else
542 report_createsub_fatal("could not access directory \"%s\": %m", datadir);
543 }
544
545 /*
546 * Retrieve the contents of this cluster's PG_VERSION. We require
547 * compatibility with the same major version as the one this tool is
548 * compiled with.
549 */
551 if (major_version != PG_MAJORVERSION_NUM)
552 {
554 "data directory is of wrong version");
556 "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
557 "PG_VERSION", version_str, PG_MAJORVERSION);
558 exit(1);
559 }
560}
uint32_t uint32
Definition c.h:624
uint32 get_pg_version(const char *datadir, char **version_str)
Definition version.c:44
@ PG_LOG_DETAIL
Definition logging.h:73
static void static void static pg_noreturn void report_createsub_fatal(const char *pg_restrict fmt,...) pg_attribute_printf(1
char * datadir
#define GET_PG_MAJORVERSION_NUM(v)
Definition version.h:19
#define stat
Definition win32_port.h:74

References datadir, fb(), GET_PG_MAJORVERSION_NUM, get_pg_version(), PG_LOG_DETAIL, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, report_createsub_fatal(), report_createsub_log(), and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1180 of file pg_createsubscriber.c.

1181{
1182 PGconn *conn;
1183 PGresult *res;
1184 bool failed = false;
1185
1186 char *wal_level;
1187 int max_repslots;
1188 int cur_repslots;
1189 int max_walsenders;
1190 int cur_walsenders;
1193
1195 "checking settings on publisher");
1196
1197 conn = connect_database(dbinfo[0].pubconninfo, true);
1198
1199 /*
1200 * If the primary server is in recovery (i.e. cascading replication),
1201 * objects (publication) cannot be created because it is read only.
1202 */
1204 {
1206 "primary server cannot be in recovery");
1208 }
1209
1210 /*------------------------------------------------------------------------
1211 * Logical replication requires a few parameters to be set on publisher.
1212 * Since these parameters are not a requirement for physical replication,
1213 * we should check it to make sure it won't fail.
1214 *
1215 * - wal_level >= replica
1216 * - max_replication_slots >= current + number of dbs to be converted
1217 * - max_wal_senders >= current + number of dbs to be converted
1218 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
1219 * -----------------------------------------------------------------------
1220 */
1221 res = PQexec(conn,
1222 "SELECT pg_catalog.current_setting('wal_level'),"
1223 " pg_catalog.current_setting('max_replication_slots'),"
1224 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1225 " pg_catalog.current_setting('max_wal_senders'),"
1226 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1227 " pg_catalog.current_setting('max_prepared_transactions'),"
1228 " pg_catalog.current_setting('max_slot_wal_keep_size')");
1229
1230 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1231 {
1233 "could not obtain publisher settings: %s",
1236 }
1237
1238 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1239 max_repslots = atoi(PQgetvalue(res, 0, 1));
1240 cur_repslots = atoi(PQgetvalue(res, 0, 2));
1241 max_walsenders = atoi(PQgetvalue(res, 0, 3));
1242 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1245
1246 PQclear(res);
1247
1249 "publisher: wal_level: %s", wal_level);
1251 "publisher: max_replication_slots: %d", max_repslots);
1253 "publisher: current replication slots: %d", cur_repslots);
1255 "publisher: max_wal_senders: %d", max_walsenders);
1257 "publisher: current wal senders: %d", cur_walsenders);
1259 "publisher: max_prepared_transactions: %d",
1262 "publisher: max_slot_wal_keep_size: %s",
1264
1265 disconnect_database(conn, false);
1266
1267 if (strcmp(wal_level, "minimal") == 0)
1268 {
1270 "publisher requires \"wal_level\" >= \"replica\"");
1271 failed = true;
1272 }
1273
1275 {
1277 "publisher requires %d replication slots, but only %d remain",
1280 "Increase the configuration parameter \"%s\" to at least %d.",
1281 "max_replication_slots", cur_repslots + num_dbs);
1282 failed = true;
1283 }
1284
1286 {
1288 "publisher requires %d WAL sender processes, but only %d remain",
1291 "Increase the configuration parameter \"%s\" to at least %d.",
1292 "max_wal_senders", cur_walsenders + num_dbs);
1293 failed = true;
1294 }
1295
1297 {
1299 "two_phase option will not be enabled for replication slots");
1301 "Subscriptions will be created with the two_phase option disabled. "
1302 "Prepared transactions will be replicated at COMMIT PREPARED.");
1304 "You can use the command-line option --enable-two-phase to enable two_phase.");
1305 }
1306
1307 /*
1308 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1309 * is set to a non-default value, it may cause replication failures due to
1310 * required WAL files being prematurely removed.
1311 */
1312 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1313 {
1315 "required WAL could be removed from the publisher");
1317 "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1318 "max_slot_wal_keep_size");
1319 }
1320
1322
1323 if (failed)
1324 exit(1);
1325}
char * pg_strdup(const char *in)
Definition fe_memutils.c:85
void pg_free(void *ptr)
@ PG_LOG_HINT
Definition logging.h:79
@ PG_LOG_DEBUG
Definition logging.h:26
@ PG_LOG_WARNING
Definition logging.h:38
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
int wal_level
Definition xlog.c:138

References conn, connect_database(), dbinfos, disconnect_database(), dry_run, fb(), num_dbs, pg_free(), PG_LOG_DEBUG, PG_LOG_DETAIL, PG_LOG_ERROR, PG_LOG_HINT, PG_LOG_INFO, PG_LOG_PRIMARY, PG_LOG_WARNING, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, report_createsub_log(), server_is_in_recovery(), LogicalRepInfos::two_phase, and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1339 of file pg_createsubscriber.c.

1340{
1341 PGconn *conn;
1342 PGresult *res;
1343 bool failed = false;
1344
1345 int max_lrworkers;
1346 int max_replorigins;
1347 int max_wprocs;
1348
1350 "checking settings on subscriber");
1351
1352 conn = connect_database(dbinfo[0].subconninfo, true);
1353
1354 /* The target server must be a standby */
1356 {
1358 "target server must be a standby");
1360 }
1361
1362 /*------------------------------------------------------------------------
1363 * Logical replication requires a few parameters to be set on subscriber.
1364 * Since these parameters are not a requirement for physical replication,
1365 * we should check it to make sure it won't fail.
1366 *
1367 * - max_active_replication_origins >= number of dbs to be converted
1368 * - max_logical_replication_workers >= number of dbs to be converted
1369 * - max_worker_processes >= 1 + number of dbs to be converted
1370 *------------------------------------------------------------------------
1371 */
1372 res = PQexec(conn,
1373 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1374 "'max_logical_replication_workers', "
1375 "'max_active_replication_origins', "
1376 "'max_worker_processes', "
1377 "'primary_slot_name') "
1378 "ORDER BY name");
1379
1380 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1381 {
1383 "could not obtain subscriber settings: %s",
1386 }
1387
1388 max_replorigins = atoi(PQgetvalue(res, 0, 0));
1389 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1390 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1391 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1393
1395 "subscriber: max_logical_replication_workers: %d",
1398 "subscriber: max_active_replication_origins: %d", max_replorigins);
1400 "subscriber: max_worker_processes: %d", max_wprocs);
1403 "subscriber: primary_slot_name: %s", primary_slot_name);
1404
1405 PQclear(res);
1406
1407 disconnect_database(conn, false);
1408
1410 {
1412 "subscriber requires %d active replication origins, but only %d remain",
1415 "Increase the configuration parameter \"%s\" to at least %d.",
1416 "max_active_replication_origins", num_dbs);
1417 failed = true;
1418 }
1419
1420 if (max_lrworkers < num_dbs)
1421 {
1423 "subscriber requires %d logical replication workers, but only %d remain",
1426 "Increase the configuration parameter \"%s\" to at least %d.",
1427 "max_logical_replication_workers", num_dbs);
1428 failed = true;
1429 }
1430
1431 if (max_wprocs < num_dbs + 1)
1432 {
1434 "subscriber requires %d worker processes, but only %d remain",
1435 num_dbs + 1, max_wprocs);
1437 "Increase the configuration parameter \"%s\" to at least %d.",
1438 "max_worker_processes", num_dbs + 1);
1439 failed = true;
1440 }
1441
1442 if (failed)
1443 exit(1);
1444}
static char * primary_slot_name

References conn, connect_database(), disconnect_database(), fb(), num_dbs, PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_HINT, PG_LOG_INFO, PG_LOG_PRIMARY, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, primary_slot_name, report_createsub_log(), and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 269 of file pg_createsubscriber.c.

270{
271 /* Rename the included configuration file, if necessary. */
273 {
276
281
283 {
284 /* durable_rename() has already logged something. */
286 "A manual removal of the recovery parameters may be required.");
287 }
288 }
289
290 if (success)
291 return;
292
293 /*
294 * If the server is promoted, there is no way to use the current setup
295 * again. Warn the user that a new replication setup should be done before
296 * trying again.
297 */
298 if (recovery_ended)
299 {
301 "failed after the end of recovery");
303 "The target server cannot be used as a physical replica anymore. "
304 "You must recreate the physical replica before continuing.");
305 }
306
307 for (int i = 0; i < num_dbs; i++)
308 {
309 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
310
311 if (dbinfo->made_publication || dbinfo->made_replslot)
312 {
313 PGconn *conn;
314
315 conn = connect_database(dbinfo->pubconninfo, false);
316 if (conn != NULL)
317 {
318 if (dbinfo->made_publication)
319 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
320 &dbinfo->made_publication);
321 if (dbinfo->made_replslot)
322 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
324 }
325 else
326 {
327 /*
328 * If a connection could not be established, inform the user
329 * that some objects were left on primary and should be
330 * removed before trying again.
331 */
332 if (dbinfo->made_publication)
333 {
335 "publication \"%s\" created in database \"%s\" on primary was left behind",
336 dbinfo->pubname,
337 dbinfo->dbname);
339 "Drop this publication before trying again.");
340 }
341 if (dbinfo->made_replslot)
342 {
344 "replication slot \"%s\" created in database \"%s\" on primary was left behind",
345 dbinfo->replslotname,
346 dbinfo->dbname);
348 "Drop this replication slot soon to avoid retention of WAL files.");
349 }
350 }
351 }
352 }
353
354 if (standby_running)
356}
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition fd.c:783
#define MAXPGPATH
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
#define INCLUDED_CONF_FILE
static bool success
static bool recovery_ended
#define INCLUDED_CONF_FILE_DISABLED
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static bool recovery_params_set
#define snprintf
Definition port.h:260
struct LogicalRepInfo * dbinfo

References conn, connect_database(), LogicalRepInfos::dbinfo, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), drop_replication_slot(), durable_rename(), fb(), i, INCLUDED_CONF_FILE, INCLUDED_CONF_FILE_DISABLED, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, MAXPGPATH, num_dbs, PG_LOG_HINT, PG_LOG_PRIMARY, PG_LOG_WARNING, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, recovery_params_set, LogicalRepInfo::replslotname, report_createsub_log(), snprintf, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char conninfo,
const char dbname 
)
static

Definition at line 570 of file pg_createsubscriber.c.

571{
573 char *ret;
574
575 Assert(conninfo != NULL);
576
577 appendPQExpBufferStr(buf, conninfo);
578 appendConnStrItem(buf, "dbname", dbname);
579
580 ret = pg_strdup(buf->data);
582
583 return ret;
584}
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert, buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), fb(), and pg_strdup().

Referenced by get_publisher_databases(), and store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char conninfo,
bool  exit_on_error 
)
static

Definition at line 669 of file pg_createsubscriber.c.

670{
671 PGconn *conn;
672 PGresult *res;
673
674 conn = PQconnectdb(conninfo);
676 {
678 "connection to database failed: %s",
680 PQfinish(conn);
681
682 if (exit_on_error)
683 exit(1);
684 return NULL;
685 }
686
687 /* Secure search_path */
690 {
692 "could not clear \"search_path\": %s",
694 PQclear(res);
695 PQfinish(conn);
696
697 if (exit_on_error)
698 exit(1);
699 return NULL;
700 }
701 PQclear(res);
702
703 return conn;
704}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition connect.h:25
PGconn * PQconnectdb(const char *conninfo)
Definition fe-connect.c:830
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
@ CONNECTION_OK
Definition libpq-fe.h:90

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, fb(), PG_LOG_ERROR, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage, PQresultStatus, PQstatus(), and report_createsub_log().

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), get_publisher_databases(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1750 of file pg_createsubscriber.c.

1751{
1753 PGresult *res = NULL;
1754 const char *slot_name = dbinfo->replslotname;
1755 char *slot_name_esc;
1756 char *lsn = NULL;
1757
1758 Assert(conn != NULL);
1759
1760 if (dry_run)
1762 "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1763 slot_name, dbinfo->dbname);
1764 else
1766 "creating the replication slot \"%s\" in database \"%s\" on publisher",
1767 slot_name, dbinfo->dbname);
1768
1769 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1770
1772 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1774 dbinfos.two_phase ? "true" : "false");
1775
1777
1779 "command is: %s", str->data);
1780
1781 if (!dry_run)
1782 {
1783 res = PQexec(conn, str->data);
1784 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1785 {
1787 "could not create replication slot \"%s\" in database \"%s\": %s",
1788 slot_name, dbinfo->dbname,
1790 PQclear(res);
1792 return NULL;
1793 }
1794
1795 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1796 PQclear(res);
1797 }
1798
1799 /* For cleanup purposes */
1800 dbinfo->made_replslot = true;
1801
1803
1804 return lsn;
1805}
const char * str

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, fb(), LogicalRepInfo::made_replslot, PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::replslotname, report_createsub_log(), str, and LogicalRepInfos::two_phase.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 2022 of file pg_createsubscriber.c.

2023{
2025 PGresult *res;
2026 char *ipubname_esc;
2027 char *spubname_esc;
2028
2029 Assert(conn != NULL);
2030
2032 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
2033
2034 /* Check if the publication already exists */
2036 "SELECT 1 FROM pg_catalog.pg_publication "
2037 "WHERE pubname = %s",
2038 spubname_esc);
2039 res = PQexec(conn, str->data);
2040 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2041 {
2043 "could not obtain publication information: %s",
2046 }
2047
2048 if (PQntuples(res) == 1)
2049 {
2050 /*
2051 * Unfortunately, if it reaches this code path, it will always fail
2052 * (unless you decide to change the existing publication name). That's
2053 * bad but it is very unlikely that the user will choose a name with
2054 * pg_createsubscriber_ prefix followed by the exact database oid and
2055 * a random number.
2056 */
2058 "publication \"%s\" already exists", dbinfo->pubname);
2060 "Consider renaming this publication before continuing.");
2062 }
2063
2064 PQclear(res);
2066
2067 if (dry_run)
2069 "dry-run: would create publication \"%s\" in database \"%s\"",
2070 dbinfo->pubname, dbinfo->dbname);
2071 else
2073 "creating publication \"%s\" in database \"%s\"",
2074 dbinfo->pubname, dbinfo->dbname);
2075
2076 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
2077 ipubname_esc);
2078
2080 "command is: %s", str->data);
2081
2082 if (!dry_run)
2083 {
2084 res = PQexec(conn, str->data);
2085 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2086 {
2088 "could not create publication \"%s\" in database \"%s\": %s",
2089 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
2091 }
2092 PQclear(res);
2093 }
2094
2095 /* For cleanup purposes */
2096 dbinfo->made_publication = true;
2097
2101}
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition fe-exec.c:4405
@ PGRES_COMMAND_OK
Definition libpq-fe.h:131
void resetPQExpBuffer(PQExpBuffer str)

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), LogicalRepInfo::made_publication, PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_HINT, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubname, report_createsub_log(), resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 2232 of file pg_createsubscriber.c.

2233{
2235 PGresult *res;
2236 char *pubname_esc;
2237 char *subname_esc;
2238 char *pubconninfo_esc;
2239 char *replslotname_esc;
2240
2241 Assert(conn != NULL);
2242
2247
2248 if (dry_run)
2250 "dry-run: would create subscription \"%s\" in database \"%s\"",
2251 dbinfo->subname, dbinfo->dbname);
2252 else
2254 "creating subscription \"%s\" in database \"%s\"",
2255 dbinfo->subname, dbinfo->dbname);
2256
2258 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
2259 "WITH (create_slot = false, enabled = false, "
2260 "slot_name = %s, copy_data = false, two_phase = %s)",
2262 dbinfos.two_phase ? "true" : "false");
2263
2268
2270 "command is: %s", str->data);
2271
2272 if (!dry_run)
2273 {
2274 res = PQexec(conn, str->data);
2275 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2276 {
2278 "could not create subscription \"%s\" in database \"%s\": %s",
2279 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
2281 }
2282 PQclear(res);
2283 }
2284
2286}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, report_createsub_log(), str, LogicalRepInfo::subname, and LogicalRepInfos::two_phase.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_existing_subscription()

static void drop_existing_subscription ( PGconn conn,
const char subname,
const char dbname 
)
static

Definition at line 1453 of file pg_createsubscriber.c.

1454{
1456 PGresult *res;
1457
1458 Assert(conn != NULL);
1459
1460 /*
1461 * Construct a query string. These commands are allowed to be executed
1462 * within a transaction.
1463 */
1464 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1465 subname);
1466 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1467 subname);
1468 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1469
1470 if (dry_run)
1472 "dry-run: would drop subscription \"%s\" in database \"%s\"",
1473 subname, dbname);
1474 else
1475 {
1477 "dropping subscription \"%s\" in database \"%s\"",
1478 subname, dbname);
1479
1480 res = PQexec(conn, query->data);
1481
1482 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1483 {
1485 "could not drop subscription \"%s\": %s",
1488 }
1489
1490 PQclear(res);
1491 }
1492
1493 destroyPQExpBuffer(query);
1494}
NameData subname

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_COMMAND_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, report_createsub_log(), and subname.

Referenced by check_and_drop_existing_subscriptions().

◆ drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo dbinfo)
static

Definition at line 1704 of file pg_createsubscriber.c.

1705{
1706 PGconn *conn;
1707 PGresult *res;
1708
1709 conn = connect_database(dbinfo[0].subconninfo, false);
1710 if (conn != NULL)
1711 {
1712 /* Get failover replication slot names */
1713 res = PQexec(conn,
1714 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1715
1716 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1717 {
1718 /* Remove failover replication slots from subscriber */
1719 for (int i = 0; i < PQntuples(res); i++)
1720 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1721 }
1722 else
1723 {
1725 "could not obtain failover replication slot information: %s",
1728 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1729 }
1730
1731 PQclear(res);
1732 disconnect_database(conn, false);
1733 }
1734 else
1735 {
1737 "could not drop failover replication slot");
1739 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1740 }
1741}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), fb(), i, PG_LOG_HINT, PG_LOG_PRIMARY, PG_LOG_WARNING, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, and report_createsub_log().

Referenced by main().

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char slotname 
)
static

Definition at line 1672 of file pg_createsubscriber.c.

1673{
1674 PGconn *conn;
1675
1676 /* Replication slot does not exist, do nothing */
1677 if (!primary_slot_name)
1678 return;
1679
1680 conn = connect_database(dbinfo[0].pubconninfo, false);
1681 if (conn != NULL)
1682 {
1683 drop_replication_slot(conn, &dbinfo[0], slotname);
1684 disconnect_database(conn, false);
1685 }
1686 else
1687 {
1689 "could not drop replication slot \"%s\" on primary",
1690 slotname);
1692 "Drop this replication slot soon to avoid retention of WAL files.");
1693 }
1694}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), fb(), PG_LOG_HINT, PG_LOG_PRIMARY, PG_LOG_WARNING, primary_slot_name, and report_createsub_log().

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
const char pubname,
const char dbname,
bool made_publication 
)
static

Definition at line 2107 of file pg_createsubscriber.c.

2109{
2111 PGresult *res;
2112 char *pubname_esc;
2113
2114 Assert(conn != NULL);
2115
2116 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
2117
2118 if (dry_run)
2120 "dry-run: would drop publication \"%s\" in database \"%s\"",
2121 pubname, dbname);
2122 else
2124 "dropping publication \"%s\" in database \"%s\"",
2125 pubname, dbname);
2126
2127 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
2128
2130
2132 "command is: %s", str->data);
2133
2134 if (!dry_run)
2135 {
2136 res = PQexec(conn, str->data);
2137 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2138 {
2140 "could not drop publication \"%s\" in database \"%s\": %s",
2141 pubname, dbname, PQresultErrorMessage(res));
2142 *made_publication = false; /* don't try again. */
2143
2144 /*
2145 * Don't disconnect and exit here. This routine is used by primary
2146 * (cleanup publication / replication slot due to an error) and
2147 * subscriber (remove the replicated publications). In both cases,
2148 * it can continue and provide instructions for the user to remove
2149 * it later if cleanup fails.
2150 */
2151 }
2152 PQclear(res);
2153 }
2154
2156}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), dry_run, fb(), PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, report_createsub_log(), and str.

Referenced by check_and_drop_publications(), and cleanup_objects_atexit().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char slot_name 
)
static

Definition at line 1808 of file pg_createsubscriber.c.

1810{
1812 char *slot_name_esc;
1813 PGresult *res;
1814
1815 Assert(conn != NULL);
1816
1817 if (dry_run)
1819 "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1820 slot_name, dbinfo->dbname);
1821 else
1823 "dropping the replication slot \"%s\" in database \"%s\"",
1824 slot_name, dbinfo->dbname);
1825
1826 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1827
1828 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1829
1831
1833 "command is: %s", str->data);
1834
1835 if (!dry_run)
1836 {
1837 res = PQexec(conn, str->data);
1838 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1839 {
1841 "could not drop replication slot \"%s\" in database \"%s\": %s",
1842 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1843 dbinfo->made_replslot = false; /* don't try again. */
1844 }
1845
1846 PQclear(res);
1847 }
1848
1850}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, fb(), LogicalRepInfo::made_replslot, PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, report_createsub_log(), and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 2400 of file pg_createsubscriber.c.

2401{
2403 PGresult *res;
2404 char *subname;
2405
2406 Assert(conn != NULL);
2407
2408 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2409
2410 if (dry_run)
2412 "dry-run: would enable subscription \"%s\" in database \"%s\"",
2413 dbinfo->subname, dbinfo->dbname);
2414 else
2416 "enabling subscription \"%s\" in database \"%s\"",
2417 dbinfo->subname, dbinfo->dbname);
2418
2419 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2420
2422 "command is: %s", str->data);
2423
2424 if (!dry_run)
2425 {
2426 res = PQexec(conn, str->data);
2427 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2428 {
2430 "could not enable subscription \"%s\": %s",
2431 dbinfo->subname, PQresultErrorMessage(res));
2433 }
2434
2435 PQclear(res);
2436 }
2437
2440}

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, report_createsub_log(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ find_publication()

static bool find_publication ( PGconn conn,
const char pubname,
const char dbname 
)
static

Definition at line 931 of file pg_createsubscriber.c.

932{
934 PGresult *res;
935 bool found = false;
936 char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
937
939 "SELECT 1 FROM pg_catalog.pg_publication "
940 "WHERE pubname = %s",
942 res = PQexec(conn, str->data);
944 {
946 "could not find publication \"%s\" in database \"%s\": %s",
947 pubname, dbname, PQerrorMessage(conn));
949 }
950
951 if (PQntuples(res) == 1)
952 found = true;
953
954 PQclear(res);
957
958 return found;
959}

References appendPQExpBuffer(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), disconnect_database(), fb(), PG_LOG_ERROR, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultStatus, report_createsub_log(), and str.

Referenced by setup_publisher().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 883 of file pg_createsubscriber.c.

884{
885 PGresult *res;
886 Oid oid;
887 uint32 rand;
888 char *objname;
889
890 res = PQexec(conn,
891 "SELECT oid FROM pg_catalog.pg_database "
892 "WHERE datname = pg_catalog.current_database()");
894 {
896 "could not obtain database OID: %s",
899 }
900
901 if (PQntuples(res) != 1)
902 {
904 "could not obtain database OID: got %d rows, expected %d row",
905 PQntuples(res), 1);
907 }
908
909 /* Database OID */
910 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
911
912 PQclear(res);
913
914 /* Random unsigned integer */
916
917 /*
918 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
919 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
920 * '\0').
921 */
922 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
923
924 return objname;
925}
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition pg_prng.c:227
unsigned int Oid
char * psprintf(const char *fmt,...)
Definition psprintf.c:43

References conn, disconnect_database(), fb(), PG_LOG_ERROR, PG_LOG_PRIMARY, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, prng_state, psprintf(), and report_createsub_log().

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char conninfo,
char **  dbname 
)
static

Definition at line 419 of file pg_createsubscriber.c.

420{
424 char *errmsg = NULL;
425 char *ret;
426
427 conn_opts = PQconninfoParse(conninfo, &errmsg);
428 if (conn_opts == NULL)
429 {
431 "could not parse connection string: %s", errmsg);
433 return NULL;
434 }
435
437 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
438 {
439 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
440 {
441 if (strcmp(conn_opt->keyword, "dbname") == 0)
442 {
443 if (dbname)
444 *dbname = pg_strdup(conn_opt->val);
445 continue;
446 }
447 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
448 }
449 }
450
451 ret = pg_strdup(buf->data);
452
455
456 return ret;
457}
void PQconninfoFree(PQconninfoOption *connOptions)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
static char * errmsg

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg, fb(), PG_LOG_ERROR, PG_LOG_PRIMARY, pg_strdup(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and report_createsub_log().

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char argv0,
const char progname 
)
static

Definition at line 490 of file pg_createsubscriber.c.

491{
492 char *versionstr;
493 char *exec_path;
494 int ret;
495
496 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
499
500 if (ret < 0)
501 {
502 char full_path[MAXPGPATH];
503
504 if (find_my_exec(argv0, full_path) < 0)
506
507 if (ret == -1)
508 report_createsub_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
509 progname, "pg_createsubscriber", full_path);
510 else
511 report_createsub_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
512 progname, full_path, "pg_createsubscriber");
513 }
514
516 "%s path is: %s", progname, exec_path);
517
518 return exec_path;
519}
int find_my_exec(const char *argv0, char *retpath)
Definition exec.c:161
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition exec.c:311
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
static const char * progname
static char * argv0
Definition pg_ctl.c:94
static char * exec_path
Definition pg_ctl.c:89
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition strlcpy.c:45

References argv0, exec_path, fb(), find_my_exec(), find_other_exec(), MAXPGPATH, PG_LOG_DEBUG, PG_LOG_PRIMARY, pg_malloc(), progname, psprintf(), report_createsub_fatal(), report_createsub_log(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char conninfo)
static

Definition at line 726 of file pg_createsubscriber.c.

727{
728 PGconn *conn;
729 PGresult *res;
731
733 "getting system identifier from publisher");
734
735 conn = connect_database(conninfo, true);
736
737 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
739 {
741 "could not get system identifier: %s",
744 }
745 if (PQntuples(res) != 1)
746 {
748 "could not get system identifier: got %d rows, expected %d row",
749 PQntuples(res), 1);
751 }
752
753 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
754
756 "system identifier is %" PRIu64 " on publisher", sysid);
757
758 PQclear(res);
760
761 return sysid;
762}
uint64_t uint64
Definition c.h:625

References conn, connect_database(), disconnect_database(), fb(), PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, and report_createsub_log().

Referenced by main().

◆ get_publisher_databases()

static void get_publisher_databases ( struct CreateSubscriberOptions opt,
bool  dbnamespecified 
)
static

Definition at line 2448 of file pg_createsubscriber.c.

2450{
2451 PGconn *conn;
2452 PGresult *res;
2453
2454 /* If a database name was specified, just connect to it. */
2455 if (dbnamespecified)
2457 else
2458 {
2459 /* Otherwise, try postgres first and then template1. */
2460 char *conninfo;
2461
2462 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2463 conn = connect_database(conninfo, false);
2464 pg_free(conninfo);
2465 if (!conn)
2466 {
2467 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2468 conn = connect_database(conninfo, true);
2469 pg_free(conninfo);
2470 }
2471 }
2472
2473 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2474 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2475 {
2477 "could not obtain a list of databases: %s",
2479 PQclear(res);
2481 }
2482
2483 for (int i = 0; i < PQntuples(res); i++)
2484 {
2485 const char *dbname = PQgetvalue(res, i, 0);
2486
2488
2489 /* Increment num_dbs to reflect multiple --database options */
2490 num_dbs++;
2491 }
2492
2493 PQclear(res);
2494 disconnect_database(conn, false);
2495}
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition simple_list.c:63
SimpleStringList database_names

References concat_conninfo_dbname(), conn, connect_database(), CreateSubscriberOptions::database_names, dbname, disconnect_database(), fb(), i, num_dbs, pg_free(), PG_LOG_ERROR, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, CreateSubscriberOptions::pub_conninfo_str, report_createsub_log(), and simple_string_list_append().

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char datadir)
static

Definition at line 770 of file pg_createsubscriber.c.

771{
773 bool crc_ok;
775
777 "getting system identifier from subscriber");
778
780 if (!crc_ok)
781 report_createsub_fatal("control file appears to be corrupt");
782
783 sysid = cf->system_identifier;
784
786 "system identifier is %" PRIu64 " on subscriber", sysid);
787
788 pg_free(cf);
789
790 return sysid;
791}
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)

References datadir, fb(), get_controlfile(), pg_free(), PG_LOG_INFO, PG_LOG_PRIMARY, report_createsub_fatal(), and report_createsub_log().

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 464 of file pg_createsubscriber.c.

465{
467 char *ret;
468
469 appendConnStrItem(buf, "port", opt->sub_port);
470#if !defined(WIN32)
471 appendConnStrItem(buf, "host", opt->socket_dir);
472#endif
473 if (opt->sub_username != NULL)
474 appendConnStrItem(buf, "user", opt->sub_username);
475 appendConnStrItem(buf, "fallback_application_name", progname);
476
477 ret = pg_strdup(buf->data);
478
480
481 return ret;
482}

References appendConnStrItem(), buf, createPQExpBuffer(), destroyPQExpBuffer(), fb(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ internal_log_file_write()

static void internal_log_file_write ( enum pg_log_level  level,
enum pg_log_part  part,
const char *pg_restrict  fmt,
va_list  args 
)
static

Definition at line 1082 of file pg_createsubscriber.c.

1084{
1086
1087 /* Do nothing if log level is too low. */
1088 if (level < __pg_log_level)
1089 return;
1090
1091 /* Add prefix based on the log part and log level */
1092 switch (part)
1093 {
1094 case PG_LOG_PRIMARY:
1095 switch (level)
1096 {
1097 case PG_LOG_ERROR:
1098 fprintf(internal_log_file_fp, _("error: "));
1099 break;
1100 case PG_LOG_WARNING:
1101 fprintf(internal_log_file_fp, _("warning: "));
1102 break;
1103 default:
1104 break;
1105 }
1106 break;
1107 case PG_LOG_DETAIL:
1108 fprintf(internal_log_file_fp, _("detail: "));
1109 break;
1110 case PG_LOG_HINT:
1111 fprintf(internal_log_file_fp, _("hint: "));
1112 break;
1113 }
1114
1116
1119}
#define fprintf(file, fmt, msg)
Definition cubescan.l:21
#define _(x)
Definition elog.c:95
enum pg_log_level __pg_log_level
Definition logging.c:21
static FILE * internal_log_file_fp
#define vfprintf
Definition port.h:263

References _, __pg_log_level, Assert, fb(), fprintf, internal_log_file_fp, PG_LOG_DETAIL, PG_LOG_ERROR, PG_LOG_HINT, PG_LOG_PRIMARY, PG_LOG_WARNING, and vfprintf.

Referenced by report_createsub_log_v().

◆ logfile_open()

static FILE * logfile_open ( const char filename,
const char mode 
)
static

Definition at line 1125 of file pg_createsubscriber.c.

1126{
1127 FILE *fh;
1128
1129 fh = fopen(filename, mode);
1130
1131 if (!fh)
1132 report_createsub_fatal("could not open log file \"%s\": %m",
1133 filename);
1134
1135 return fh;
1136}
static PgChecksumMode mode
static char * filename
Definition pg_dumpall.c:133

References fb(), filename, mode, and report_createsub_fatal().

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2498 of file pg_createsubscriber.c.

2499{
2500 static struct option long_options[] =
2501 {
2502 {"all", no_argument, NULL, 'a'},
2503 {"database", required_argument, NULL, 'd'},
2504 {"pgdata", required_argument, NULL, 'D'},
2505 {"logdir", required_argument, NULL, 'l'},
2506 {"dry-run", no_argument, NULL, 'n'},
2507 {"subscriber-port", required_argument, NULL, 'p'},
2508 {"publisher-server", required_argument, NULL, 'P'},
2509 {"socketdir", required_argument, NULL, 's'},
2510 {"recovery-timeout", required_argument, NULL, 't'},
2511 {"enable-two-phase", no_argument, NULL, 'T'},
2512 {"subscriber-username", required_argument, NULL, 'U'},
2513 {"verbose", no_argument, NULL, 'v'},
2514 {"version", no_argument, NULL, 'V'},
2515 {"help", no_argument, NULL, '?'},
2516 {"config-file", required_argument, NULL, 1},
2517 {"publication", required_argument, NULL, 2},
2518 {"replication-slot", required_argument, NULL, 3},
2519 {"subscription", required_argument, NULL, 4},
2520 {"clean", required_argument, NULL, 5},
2521 {NULL, 0, NULL, 0}
2522 };
2523
2524 struct CreateSubscriberOptions opt = {0};
2525
2526 int c;
2527 int option_index;
2528
2529 char *pub_base_conninfo;
2530 char *sub_base_conninfo;
2531 char *dbname_conninfo = NULL;
2532
2535 struct stat statbuf;
2536
2537 char *consistent_lsn;
2538
2539 char pidfile[MAXPGPATH];
2540
2541 pg_logging_init(argv[0]);
2543 progname = get_progname(argv[0]);
2544 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2545
2546 if (argc > 1)
2547 {
2548 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2549 {
2550 usage();
2551 exit(0);
2552 }
2553 else if (strcmp(argv[1], "-V") == 0
2554 || strcmp(argv[1], "--version") == 0)
2555 {
2556 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2557 exit(0);
2558 }
2559 }
2560
2561 /* Default settings */
2563 opt.config_file = NULL;
2564 opt.log_dir = NULL;
2565 opt.pub_conninfo_str = NULL;
2566 opt.socket_dir = NULL;
2568 opt.sub_username = NULL;
2569 opt.two_phase = false;
2571 {
2572 0
2573 };
2574 opt.recovery_timeout = 0;
2575 opt.all_dbs = false;
2576
2577 /*
2578 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2579 * it either.
2580 */
2581#ifndef WIN32
2582 if (geteuid() == 0)
2583 {
2585 "cannot be executed by \"root\"");
2587 "You must run %s as the PostgreSQL superuser.",
2588 progname);
2589 exit(1);
2590 }
2591#endif
2592
2594
2595 while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
2596 long_options, &option_index)) != -1)
2597 {
2598 switch (c)
2599 {
2600 case 'a':
2601 opt.all_dbs = true;
2602 break;
2603 case 'd':
2605 {
2607 num_dbs++;
2608 }
2609 else
2610 report_createsub_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2611 break;
2612 case 'D':
2615 break;
2616 case 'l':
2617 opt.log_dir = pg_strdup(optarg);
2619 break;
2620 case 'n':
2621 dry_run = true;
2622 break;
2623 case 'p':
2624 opt.sub_port = pg_strdup(optarg);
2625 break;
2626 case 'P':
2628 break;
2629 case 's':
2632 break;
2633 case 't':
2635 break;
2636 case 'T':
2637 opt.two_phase = true;
2638 break;
2639 case 'U':
2641 break;
2642 case 'v':
2644 break;
2645 case 1:
2647 break;
2648 case 2:
2650 {
2652 num_pubs++;
2653 }
2654 else
2655 report_createsub_fatal("publication \"%s\" specified more than once for --publication", optarg);
2656 break;
2657 case 3:
2659 {
2661 num_replslots++;
2662 }
2663 else
2664 report_createsub_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2665 break;
2666 case 4:
2668 {
2670 num_subs++;
2671 }
2672 else
2673 report_createsub_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2674 break;
2675 case 5:
2678 else
2679 report_createsub_fatal("object type \"%s\" specified more than once for --clean", optarg);
2680 break;
2681 default:
2682 /* getopt_long already emitted a complaint */
2684 "Try \"%s --help\" for more information.",
2685 progname);
2686 exit(1);
2687 }
2688 }
2689
2690 /* Validate that --all is not used with incompatible options */
2691 if (opt.all_dbs)
2692 {
2693 char *bad_switch = NULL;
2694
2695 if (num_dbs > 0)
2696 bad_switch = "--database";
2697 else if (num_pubs > 0)
2698 bad_switch = "--publication";
2699 else if (num_replslots > 0)
2700 bad_switch = "--replication-slot";
2701 else if (num_subs > 0)
2702 bad_switch = "--subscription";
2703
2704 if (bad_switch)
2705 {
2707 "options %s and %s cannot be used together",
2708 bad_switch, "-a/--all");
2710 "Try \"%s --help\" for more information.",
2711 progname);
2712 exit(1);
2713 }
2714 }
2715
2716 /* Any non-option arguments? */
2717 if (optind < argc)
2718 {
2720 "too many command-line arguments (first is \"%s\")",
2721 argv[optind]);
2723 "Try \"%s --help\" for more information.", progname);
2724 exit(1);
2725 }
2726
2727 /* Required arguments */
2728 if (subscriber_dir == NULL)
2729 {
2731 "no subscriber data directory specified");
2733 "Try \"%s --help\" for more information.", progname);
2734 exit(1);
2735 }
2736
2737 /* If socket directory is not provided, use the current directory */
2738 if (opt.socket_dir == NULL)
2739 {
2740 char cwd[MAXPGPATH];
2741
2742 if (!getcwd(cwd, MAXPGPATH))
2743 report_createsub_fatal("could not determine current directory");
2744 opt.socket_dir = pg_strdup(cwd);
2746 }
2747
2748 /*
2749 * Parse connection string. Build a base connection string that might be
2750 * reused by multiple databases.
2751 */
2752 if (opt.pub_conninfo_str == NULL)
2753 {
2754 /*
2755 * TODO use primary_conninfo (if available) from subscriber and
2756 * extract publisher connection string. Assume that there are
2757 * identical entries for physical and logical replication. If there is
2758 * not, we would fail anyway.
2759 */
2761 "no publisher connection string specified");
2763 "Try \"%s --help\" for more information.", progname);
2764 exit(1);
2765 }
2766
2767 if (opt.log_dir != NULL)
2768 {
2769 char *internal_log_file;
2770
2772
2773 /*
2774 * Set mask based on PGDATA permissions, needed for the creation of
2775 * the output directories with correct permissions, similar with
2776 * pg_ctl and pg_upgrade.
2777 *
2778 * Don't error here if the data directory cannot be stat'd. Upcoming
2779 * checks for the data directory would raise the fatal error later.
2780 */
2783
2786
2787 /* logfile_open() will exit if there is an error */
2790 }
2791
2792 if (dry_run)
2794 "Executing in dry-run mode.\n"
2795 "The target directory will not be modified.");
2796
2798 "validating publisher connection string");
2801 if (pub_base_conninfo == NULL)
2802 exit(1);
2803
2805 "validating subscriber connection string");
2807
2808 /*
2809 * Fetch all databases from the source (publisher) and treat them as if
2810 * the user specified has multiple --database options, one for each source
2811 * database.
2812 */
2813 if (opt.all_dbs)
2814 {
2816
2818 }
2819
2820 if (opt.database_names.head == NULL)
2821 {
2823 "no database was specified");
2824
2825 /*
2826 * Try to obtain the dbname from the publisher conninfo. If dbname
2827 * parameter is not available, error out.
2828 */
2829 if (dbname_conninfo)
2830 {
2832 num_dbs++;
2833
2835 "database name \"%s\" was extracted from the publisher connection string",
2837 }
2838 else
2839 {
2841 "no database name specified");
2843 "Try \"%s --help\" for more information.",
2844 progname);
2845 exit(1);
2846 }
2847 }
2848
2849 /* Number of object names must match number of databases */
2850 if (num_pubs > 0 && num_pubs != num_dbs)
2851 {
2853 "wrong number of publication names specified");
2855 "The number of specified publication names (%d) must match the number of specified database names (%d).",
2856 num_pubs, num_dbs);
2857 exit(1);
2858 }
2859 if (num_subs > 0 && num_subs != num_dbs)
2860 {
2862 "wrong number of subscription names specified");
2864 "The number of specified subscription names (%d) must match the number of specified database names (%d).",
2865 num_subs, num_dbs);
2866 exit(1);
2867 }
2868 if (num_replslots > 0 && num_replslots != num_dbs)
2869 {
2871 "wrong number of replication slot names specified");
2873 "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2875 exit(1);
2876 }
2877
2878 /* Verify the object types specified for removal from the subscriber */
2879 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2880 {
2881 if (pg_strcasecmp(cell->val, "publications") == 0)
2883 else
2884 {
2886 "invalid object type \"%s\" specified for %s",
2887 cell->val, "--clean");
2889 "The valid value is: \"%s\"", "publications");
2890 exit(1);
2891 }
2892 }
2893
2894 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2895 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2896 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2897
2898 /* Rudimentary check for a data directory */
2900
2902
2903 /*
2904 * Store database information for publisher and subscriber. It should be
2905 * called before atexit() because its return is used in the
2906 * cleanup_objects_atexit().
2907 */
2909
2910 /* Register a function to clean up objects in case of failure */
2912
2913 /*
2914 * Check if the subscriber data directory has the same system identifier
2915 * than the publisher data directory.
2916 */
2919 if (pub_sysid != sub_sysid)
2920 report_createsub_fatal("subscriber data directory is not a copy of the source database cluster");
2921
2922 /* Subscriber PID file */
2923 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2924
2925 /*
2926 * The standby server must not be running. If the server is started under
2927 * service manager and pg_createsubscriber stops it, the service manager
2928 * might react to this action and start the server again. Therefore,
2929 * refuse to proceed if the server is running to avoid possible failures.
2930 */
2931 if (stat(pidfile, &statbuf) == 0)
2932 {
2934 "standby server is running");
2936 "Stop the standby server and try again.");
2937 exit(1);
2938 }
2939
2940 /*
2941 * Start a short-lived standby server with temporary parameters (provided
2942 * by command-line options). The goal is to avoid connections during the
2943 * transformation steps.
2944 */
2946 "starting the standby server with command-line options");
2947 start_standby_server(&opt, true, false);
2948
2949 /* Check if the standby server is ready for logical replication */
2951
2952 /* Check if the primary server is ready for logical replication */
2954
2955 /*
2956 * Stop the target server. The recovery process requires that the server
2957 * reaches a consistent state before targeting the recovery stop point.
2958 * Make sure a consistent state is reached (stop the target server
2959 * guarantees it) *before* creating the replication slots in
2960 * setup_publisher().
2961 */
2963 "stopping the subscriber");
2965
2966 /* Create the required objects for each database on publisher */
2968
2969 /* Write the required recovery parameters */
2971
2972 /*
2973 * Start subscriber so the recovery parameters will take effect. Wait
2974 * until accepting connections. We don't want to start logical replication
2975 * during setup.
2976 */
2978 "starting the subscriber");
2979 start_standby_server(&opt, true, true);
2980
2981 /* Waiting the subscriber to be promoted */
2983
2984 /*
2985 * Create the subscription for each database on subscriber. It does not
2986 * enable it immediately because it needs to adjust the replication start
2987 * point to the LSN reported by setup_publisher(). It also cleans up
2988 * publications created by this tool and replication to the standby.
2989 */
2991
2992 /* Remove primary_slot_name if it exists on primary */
2994
2995 /* Remove failover replication slots if they exist on subscriber */
2997
2998 /* Stop the subscriber */
3000 "stopping the subscriber");
3002
3003 /* Change system identifier from subscriber */
3005
3006 success = true;
3007
3009 "Done!");
3010
3011 return 0;
3012}
#define PG_TEXTDOMAIN(domain)
Definition c.h:1303
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition exec.c:430
int pg_mode_mask
Definition file_perm.c:25
bool GetDataDirectoryCreatePerm(const char *dataDir)
#define PG_MODE_MASK_OWNER
Definition file_perm.h:24
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition getopt_long.c:60
#define no_argument
Definition getopt_long.h:25
#define required_argument
Definition getopt_long.h:26
void pg_logging_increase_verbosity(void)
Definition logging.c:185
void pg_logging_init(const char *argv0)
Definition logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition logging.c:176
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
#define INTERNAL_LOG_FILE_NAME
static char * pg_ctl_path
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static FILE * logfile_open(const char *filename, const char *mode)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static char logdir[MAXPGPATH]
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static char * pg_resetwal_path
static void make_output_dirs(const char *log_basedir)
static void usage(void)
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition getopt.c:47
PGDLLIMPORT char * optarg
Definition getopt.c:49
int pg_strcasecmp(const char *s1, const char *s2)
void canonicalize_path(char *path)
Definition path.c:337
const char * get_progname(const char *argv0)
Definition path.c:652
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition simple_list.c:87
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
SimpleStringListCell * head
Definition simple_list.h:42

References CreateSubscriberOptions::all_dbs, canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, LogicalRepInfos::dbinfo, dbinfos, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, fb(), get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_publisher_databases(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), GetDataDirectoryCreatePerm(), getopt_long(), SimpleStringList::head, internal_log_file_fp, INTERNAL_LOG_FILE_NAME, CreateSubscriberOptions::log_dir, logdir, logfile_open(), make_output_dirs(), MAXPGPATH, modify_subscriber_sysid(), no_argument, num_dbs, num_pubs, num_replslots, num_subs, OBJECTTYPE_PUBLICATIONS, CreateSubscriberOptions::objecttypes_to_clean, LogicalRepInfos::objecttypes_to_clean, optarg, optind, pg_ctl_path, pg_free(), PG_LOG_DETAIL, PG_LOG_ERROR, PG_LOG_HINT, PG_LOG_INFO, PG_LOG_PRIMARY, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_mode_mask, PG_MODE_MASK_OWNER, pg_resetwal_path, pg_strcasecmp(), pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, psprintf(), CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, report_createsub_fatal(), report_createsub_log(), required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, LogicalRepInfo::subconninfo, subscriber_dir, success, CreateSubscriberOptions::two_phase, LogicalRepInfos::two_phase, usage(), and wait_for_end_recovery().

◆ make_output_dirs()

static void make_output_dirs ( const char log_basedir)
static

Definition at line 1139 of file pg_createsubscriber.c.

1140{
1141 char timestamp[128];
1142 struct timeval tval;
1143 time_t now;
1144 struct tm tmbuf;
1145 int len;
1146
1147 /* Generate timestamp */
1149 now = tval.tv_sec;
1150
1151 strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
1152 localtime_r(&now, &tmbuf));
1153
1154 /* Append milliseconds */
1156 sizeof(timestamp) - strlen(timestamp), ".%03u",
1157 (unsigned int) (tval.tv_usec / 1000));
1158
1159 /* Build timestamp directory path */
1161
1162 if (len >= MAXPGPATH)
1163 report_createsub_fatal("directory path for log files is too long");
1164
1165 /* Create base directory (ignore if exists) */
1167 report_createsub_fatal("could not create directory \"%s\": %m", log_basedir);
1168
1169 /* Create a timestamp-named subdirectory under the base directory */
1171 report_createsub_fatal("could not create directory \"%s\": %m", logdir);
1172}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
int pg_dir_create_mode
Definition file_perm.c:18
static struct pg_tm tm
Definition localtime.c:104
const void size_t len
int64 timestamp
#define mkdir(a, b)
Definition win32_port.h:80
int gettimeofday(struct timeval *tp, void *tzp)

References fb(), gettimeofday(), len, logdir, MAXPGPATH, mkdir, now(), pg_dir_create_mode, report_createsub_fatal(), snprintf, and tm.

Referenced by main().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 799 of file pg_createsubscriber.c.

800{
802 bool crc_ok;
803 struct timeval tv;
804
805 char *out_file;
806 char *cmd_str;
807
809 "modifying system identifier of subscriber");
810
812 if (!crc_ok)
813 report_createsub_fatal("control file appears to be corrupt");
814
815 /*
816 * Select a new system identifier.
817 *
818 * XXX this code was extracted from BootStrapXLOG().
819 */
820 gettimeofday(&tv, NULL);
821 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
822 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
823 cf->system_identifier |= getpid() & 0xFFF;
824
825 if (dry_run)
827 "dry-run: would set system identifier to %" PRIu64 " on subscriber",
828 cf->system_identifier);
829 else
830 {
833 "system identifier is %" PRIu64 " on subscriber",
834 cf->system_identifier);
835 }
836
837 if (dry_run)
839 "dry-run: would run pg_resetwal on the subscriber");
840 else
842 "running pg_resetwal on the subscriber");
843
844 /*
845 * Redirecting the output to the logfile if specified. Since the output
846 * would be very short, around one line, we do not provide a separate file
847 * for it; it's done as a part of the server log.
848 */
849 if (opt->log_dir)
851 else
853
854 cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
856 if (opt->log_dir)
858
860 "pg_resetwal command is: %s", cmd_str);
861
862 if (!dry_run)
863 {
864 int rc = system(cmd_str);
865
866 if (rc == 0)
868 "successfully reset WAL on the subscriber");
869 else
870 report_createsub_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
871 }
872
873 pg_free(cf);
875}
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define SERVER_LOG_FILE_NAME
#define DEVNULL
Definition port.h:161
char * wait_result_to_str(int exitstatus)
Definition wait_error.c:33

References DEVNULL, dry_run, fb(), get_controlfile(), gettimeofday(), CreateSubscriberOptions::log_dir, logdir, pg_free(), PG_LOG_DEBUG, PG_LOG_INFO, PG_LOG_PRIMARY, pg_resetwal_path, psprintf(), report_createsub_fatal(), report_createsub_log(), SERVER_LOG_FILE_NAME, subscriber_dir, update_controlfile(), and wait_result_to_str().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char pg_ctl_cmd,
int  rc 
)
static

Definition at line 1856 of file pg_createsubscriber.c.

1857{
1858 if (rc != 0)
1859 {
1860 if (WIFEXITED(rc))
1861 {
1863 "pg_ctl failed with exit code %d",
1864 WEXITSTATUS(rc));
1865 }
1866 else if (WIFSIGNALED(rc))
1867 {
1868#if defined(WIN32)
1870 "pg_ctl was terminated by exception 0x%X",
1871 WTERMSIG(rc));
1873 "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1874#else
1876 "pg_ctl was terminated by signal %d: %s",
1877 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1878#endif
1879 }
1880 else
1881 {
1883 "pg_ctl exited with unrecognized status %d", rc);
1884 }
1885
1887 "The failed command was: %s", pg_ctl_cmd);
1888 exit(1);
1889 }
1890}
const char * pg_strsignal(int signum)
Definition pgstrsignal.c:39
#define WIFEXITED(w)
Definition win32_port.h:150
#define WIFSIGNALED(w)
Definition win32_port.h:151
#define WTERMSIG(w)
Definition win32_port.h:153
#define WEXITSTATUS(w)
Definition win32_port.h:152

References fb(), PG_LOG_DETAIL, PG_LOG_ERROR, PG_LOG_PRIMARY, pg_strsignal(), report_createsub_log(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ report_createsub_fatal()

static void report_createsub_fatal ( const char *pg_restrict  fmt,
  ... 
)
static

Definition at line 242 of file pg_createsubscriber.c.

243{
245
246 va_start(args, fmt);
247
249
250 va_end(args);
251
252 exit(1);
253}
static void static void report_createsub_log_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list args) pg_attribute_printf(3

References fb(), PG_LOG_ERROR, PG_LOG_PRIMARY, and report_createsub_log_v().

Referenced by check_data_directory(), get_exec_path(), get_standby_sysid(), logfile_open(), main(), make_output_dirs(), modify_subscriber_sysid(), setup_recovery(), and wait_for_end_recovery().

◆ report_createsub_log()

◆ report_createsub_log_v()

static void report_createsub_log_v ( enum pg_log_level  level,
enum pg_log_part  part,
const char *pg_restrict  fmt,
va_list  args 
)
static

Definition at line 206 of file pg_createsubscriber.c.

208{
209 int save_errno = errno;
210
212 {
213 /* Output to both stderr and the log file */
215
216 va_copy(arg_cpy, args);
219 /* Restore errno in case internal_log_file_write changed it */
221 }
222 pg_log_generic_v(level, part, fmt, args);
223}
void pg_log_generic_v(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list ap)
Definition logging.c:219
static void static void static pg_noreturn void static void internal_log_file_write(enum pg_log_level level, enum pg_log_part part, const char *pg_restrict fmt, va_list args) pg_attribute_printf(3

References fb(), internal_log_file_fp, internal_log_file_write(), and pg_log_generic_v().

Referenced by report_createsub_fatal(), and report_createsub_log().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 1058 of file pg_createsubscriber.c.

1059{
1060 PGresult *res;
1061 int ret;
1062
1063 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
1064
1065 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1066 {
1068 "could not obtain recovery progress: %s",
1071 }
1072
1073
1074 ret = strcmp("t", PQgetvalue(res, 0, 0));
1075
1076 PQclear(res);
1077
1078 return ret == 0;
1079}

References conn, disconnect_database(), fb(), PG_LOG_ERROR, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, and report_createsub_log().

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char lsn 
)
static

Definition at line 2299 of file pg_createsubscriber.c.

2300{
2302 PGresult *res;
2303 Oid suboid;
2304 char *subname;
2305 char *dbname;
2306 char *originname;
2307 char *lsnstr;
2308
2309 Assert(conn != NULL);
2310
2311 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2312 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2313
2315 "SELECT s.oid FROM pg_catalog.pg_subscription s "
2316 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2317 "WHERE s.subname = %s AND d.datname = %s",
2318 subname, dbname);
2319
2320 res = PQexec(conn, str->data);
2321 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2322 {
2324 "could not obtain subscription OID: %s",
2327 }
2328
2329 if (PQntuples(res) != 1 && !dry_run)
2330 {
2332 "could not obtain subscription OID: got %d rows, expected %d row",
2333 PQntuples(res), 1);
2335 }
2336
2337 if (dry_run)
2338 {
2341 }
2342 else
2343 {
2344 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2345 lsnstr = psprintf("%s", lsn);
2346 }
2347
2348 PQclear(res);
2349
2350 /*
2351 * The origin name is defined as pg_%u. %u is the subscription OID. See
2352 * ApplyWorkerMain().
2353 */
2354 originname = psprintf("pg_%u", suboid);
2355
2356 if (dry_run)
2358 "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2359 originname, lsnstr, dbinfo->dbname);
2360 else
2362 "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2363 originname, lsnstr, dbinfo->dbname);
2364
2367 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2369
2371 "command is: %s", str->data);
2372
2373 if (!dry_run)
2374 {
2375 res = PQexec(conn, str->data);
2376 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2377 {
2379 "could not set replication progress for subscription \"%s\": %s",
2380 dbinfo->subname, PQresultErrorMessage(res));
2382 }
2383 PQclear(res);
2384 }
2385
2389 pg_free(lsnstr);
2391}
#define InvalidOid
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
uint64 XLogRecPtr
Definition xlogdefs.h:21
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, fb(), InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), PG_LOG_DEBUG, PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, psprintf(), report_createsub_log(), resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 968 of file pg_createsubscriber.c.

969{
970 char *lsn = NULL;
971
972 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
973
974 for (int i = 0; i < num_dbs; i++)
975 {
976 PGconn *conn;
977 char *genname = NULL;
978
979 conn = connect_database(dbinfo[i].pubconninfo, true);
980
981 /*
982 * If an object name was not specified as command-line options, assign
983 * a generated object name. The replication slot has a different rule.
984 * The subscription name is assigned to the replication slot name if
985 * no replication slot is specified. It follows the same rule as
986 * CREATE SUBSCRIPTION.
987 */
988 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
990 if (num_pubs == 0)
991 dbinfo[i].pubname = pg_strdup(genname);
992 if (num_subs == 0)
993 dbinfo[i].subname = pg_strdup(genname);
994 if (num_replslots == 0)
995 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
996
997 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
998 {
999 /* Reuse existing publication on publisher. */
1001 "use existing publication \"%s\" in database \"%s\"",
1002 dbinfo[i].pubname, dbinfo[i].dbname);
1003 /* Don't remove pre-existing publication if an error occurs. */
1004 dbinfo[i].made_publication = false;
1005 }
1006 else
1007 {
1008 /*
1009 * Create publication on publisher. This step should be executed
1010 * *before* promoting the subscriber to avoid any transactions
1011 * between consistent LSN and the new publication rows (such
1012 * transactions wouldn't see the new publication rows resulting in
1013 * an error).
1014 */
1015 create_publication(conn, &dbinfo[i]);
1016 }
1017
1018 /* Create replication slot on publisher */
1019 if (lsn)
1020 pg_free(lsn);
1021 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
1022 if (lsn == NULL && !dry_run)
1023 exit(1);
1024
1025 /*
1026 * Since we are using the LSN returned by the last replication slot as
1027 * recovery_target_lsn, this LSN is ahead of the current WAL position
1028 * and the recovery waits until the publisher writes a WAL record to
1029 * reach the target and ends the recovery. On idle systems, this wait
1030 * time is unpredictable and could lead to failure in promoting the
1031 * subscriber. To avoid that, insert a harmless WAL record.
1032 */
1033 if (i == num_dbs - 1 && !dry_run)
1034 {
1035 PGresult *res;
1036
1037 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
1038 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1039 {
1041 "could not write an additional WAL record: %s",
1044 }
1045 PQclear(res);
1046 }
1047
1048 disconnect_database(conn, false);
1049 }
1050
1051 return lsn;
1052}
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), dbname, disconnect_database(), dry_run, fb(), find_publication(), generate_object_name(), i, LogicalRepInfo::made_publication, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), PG_LOG_ERROR, PG_LOG_INFO, PG_LOG_PRIMARY, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, report_createsub_log(), LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char datadir,
const char lsn 
)
static

Definition at line 1577 of file pg_createsubscriber.c.

1578{
1579 PGconn *conn;
1581
1582 /*
1583 * Despite of the recovery parameters will be written to the subscriber,
1584 * use a publisher connection. The primary_conninfo is generated using the
1585 * connection settings.
1586 */
1587 conn = connect_database(dbinfo[0].pubconninfo, true);
1588
1589 /*
1590 * Write recovery parameters.
1591 *
1592 * The subscriber is not running yet. In dry run mode, the recovery
1593 * parameters *won't* be written. An invalid LSN is used for printing
1594 * purposes. Additional recovery parameters are added here. It avoids
1595 * unexpected behavior such as end of recovery as soon as a consistent
1596 * state is reached (recovery_target) and failure due to multiple recovery
1597 * targets (name, time, xid, LSN).
1598 */
1600 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1602 "recovery_target_timeline = 'latest'\n");
1603
1604 /*
1605 * Set recovery_target_inclusive = false to avoid reapplying the
1606 * transaction committed at 'lsn' after subscription is enabled. This is
1607 * because the provided 'lsn' is also used as the replication start point
1608 * for the subscription. So, the server can send the transaction committed
1609 * at that 'lsn' after replication is started which can lead to applying
1610 * the same transaction twice if we keep recovery_target_inclusive = true.
1611 */
1613 "recovery_target_inclusive = false\n");
1615 "recovery_target_action = promote\n");
1616 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1617 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1618 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1619
1620 if (dry_run)
1621 {
1622 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1624 "recovery_target_lsn = '%X/%08X'\n",
1626 }
1627 else
1628 {
1629 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1630 lsn);
1631 }
1632
1634 "recovery parameters:\n%s", recoveryconfcontents->data);
1635
1636 if (!dry_run)
1637 {
1639 FILE *fd;
1640
1641 /* Write the recovery parameters to INCLUDED_CONF_FILE */
1644 fd = fopen(conf_filename, "w");
1645 if (fd == NULL)
1646 report_createsub_fatal("could not open file \"%s\": %m", conf_filename);
1647
1649 report_createsub_fatal("could not write to file \"%s\": %m", conf_filename);
1650
1651 fclose(fd);
1652 recovery_params_set = true;
1653
1654 /* Include conditionally the recovery parameters. */
1657 "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1659 }
1660
1661 disconnect_database(conn, false);
1662}
static PQExpBuffer recoveryconfcontents
static int fd(const char *x, int i)
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)

References appendPQExpBuffer(), appendPQExpBufferStr(), conn, connect_database(), PQExpBufferData::data, datadir, disconnect_database(), dry_run, fb(), fd(), GenerateRecoveryConfig(), INCLUDED_CONF_FILE, InvalidXLogRecPtr, PQExpBufferData::len, LSN_FORMAT_ARGS, MAXPGPATH, PG_LOG_DEBUG, PG_LOG_PRIMARY, recovery_params_set, recoveryconfcontents, report_createsub_fatal(), report_createsub_log(), resetPQExpBuffer(), snprintf, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char consistent_lsn 
)
static

Definition at line 1541 of file pg_createsubscriber.c.

1542{
1543 for (int i = 0; i < num_dbs; i++)
1544 {
1545 PGconn *conn;
1546
1547 /* Connect to subscriber. */
1548 conn = connect_database(dbinfo[i].subconninfo, true);
1549
1550 /*
1551 * We don't need the pre-existing subscriptions on the newly formed
1552 * subscriber. They can connect to other publisher nodes and either
1553 * get some unwarranted data or can lead to ERRORs in connecting to
1554 * such nodes.
1555 */
1557
1558 /* Check and drop the required publications in the given database. */
1560
1561 create_subscription(conn, &dbinfo[i]);
1562
1563 /* Set the replication progress to the correct LSN */
1565
1566 /* Enable subscription */
1567 enable_subscription(conn, &dbinfo[i]);
1568
1569 disconnect_database(conn, false);
1570 }
1571}
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), check_and_drop_publications(), conn, connect_database(), create_subscription(), disconnect_database(), enable_subscription(), fb(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access,
bool  restrict_logical_worker 
)
static

Definition at line 1893 of file pg_createsubscriber.c.

1895{
1897 int rc;
1898
1899 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1901 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1902
1903 /* Prevent unintended slot invalidation */
1904 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1905
1907 {
1908 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1909#if !defined(WIN32)
1910
1911 /*
1912 * An empty listen_addresses list means the server does not listen on
1913 * any IP interfaces; only Unix-domain sockets can be used to connect
1914 * to the server. Prevent external connections to minimize the chance
1915 * of failure.
1916 */
1917 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1918 if (opt->socket_dir)
1919 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1920 opt->socket_dir);
1922#endif
1923 }
1924 if (opt->config_file != NULL)
1925 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1926 opt->config_file);
1927
1928 /* Suppress to start logical replication if requested */
1930 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1931
1932 if (opt->log_dir)
1934
1936 "pg_ctl command is: %s", pg_ctl_cmd->data);
1937 rc = system(pg_ctl_cmd->data);
1938 pg_ctl_status(pg_ctl_cmd->data, rc);
1939 standby_running = true;
1942 "server was started");
1943}
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
void appendShellString(PQExpBuffer buf, const char *str)

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), destroyPQExpBuffer(), fb(), CreateSubscriberOptions::log_dir, logdir, pg_ctl_path, pg_ctl_status(), PG_LOG_DEBUG, PG_LOG_INFO, PG_LOG_PRIMARY, report_createsub_log(), SERVER_LOG_FILE_NAME, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char datadir)
static

Definition at line 1946 of file pg_createsubscriber.c.

1947{
1948 char *pg_ctl_cmd;
1949 int rc;
1950
1951 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1952 datadir);
1954 "pg_ctl command is: %s", pg_ctl_cmd);
1955 rc = system(pg_ctl_cmd);
1957 standby_running = false;
1959 "server was stopped");
1960}

References datadir, fb(), pg_ctl_path, pg_ctl_status(), PG_LOG_DEBUG, PG_LOG_INFO, PG_LOG_PRIMARY, psprintf(), report_createsub_log(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char pub_base_conninfo,
const char sub_base_conninfo 
)
static

Definition at line 594 of file pg_createsubscriber.c.

597{
598 struct LogicalRepInfo *dbinfo;
602 int i = 0;
603
604 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
605
606 if (num_pubs > 0)
607 pubcell = opt->pub_names.head;
608 if (num_subs > 0)
609 subcell = opt->sub_names.head;
610 if (num_replslots > 0)
612
613 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
614 {
615 char *conninfo;
616
617 /* Fill publisher attributes */
618 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
619 dbinfo[i].pubconninfo = conninfo;
620 dbinfo[i].dbname = cell->val;
621 if (num_pubs > 0)
622 dbinfo[i].pubname = pubcell->val;
623 else
624 dbinfo[i].pubname = NULL;
625 if (num_replslots > 0)
626 dbinfo[i].replslotname = replslotcell->val;
627 else
628 dbinfo[i].replslotname = NULL;
629 dbinfo[i].made_replslot = false;
630 dbinfo[i].made_publication = false;
631 /* Fill subscriber attributes */
632 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
633 dbinfo[i].subconninfo = conninfo;
634 if (num_subs > 0)
635 dbinfo[i].subname = subcell->val;
636 else
637 dbinfo[i].subname = NULL;
638 /* Other fields will be filled later */
639
641 "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
642 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
643 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
644 dbinfo[i].pubconninfo);
646 "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
647 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
648 dbinfo[i].subconninfo,
649 dbinfos.two_phase ? "true" : "false");
650
651 if (num_pubs > 0)
652 pubcell = pubcell->next;
653 if (num_subs > 0)
654 subcell = subcell->next;
655 if (num_replslots > 0)
657
658 i++;
659 }
660
661 return dbinfo;
662}
#define pg_malloc_array(type, count)
Definition fe_memutils.h:56
static bool two_phase

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfos, LogicalRepInfo::dbname, fb(), SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, num_pubs, num_replslots, num_subs, PG_LOG_DEBUG, PG_LOG_PRIMARY, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, report_createsub_log(), CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, and LogicalRepInfos::two_phase.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 359 of file pg_createsubscriber.c.

360{
361 printf(_("%s creates a new logical replica from a standby server.\n\n"),
362 progname);
363 printf(_("Usage:\n"));
364 printf(_(" %s [OPTION]...\n"), progname);
365 printf(_("\nOptions:\n"));
366 printf(_(" -a, --all create subscriptions for all databases except template\n"
367 " databases and databases that don't allow connections\n"));
368 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
369 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
370 printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
371 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
372 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
373 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
374 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
375 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
376 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
377 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
378 printf(_(" -v, --verbose output verbose messages\n"));
379 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
380 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
381 printf(_(" --config-file=FILENAME use specified main server configuration\n"
382 " file when running target cluster\n"));
383 printf(_(" --publication=NAME publication name\n"));
384 printf(_(" --replication-slot=NAME replication slot name\n"));
385 printf(_(" --subscription=NAME subscription name\n"));
386 printf(_(" -V, --version output version information, then exit\n"));
387 printf(_(" -?, --help show this help, then exit\n"));
388 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
389 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
390}
#define printf(...)
Definition port.h:266

References _, DEFAULT_SUB_PORT, fb(), printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1972 of file pg_createsubscriber.c.

1973{
1974 PGconn *conn;
1975 bool ready = false;
1976 int timer = 0;
1977
1979 "waiting for the target server to reach the consistent state");
1980
1981 conn = connect_database(conninfo, true);
1982
1983 for (;;)
1984 {
1985 /* Did the recovery process finish? We're done if so. */
1987 {
1988 ready = true;
1989 recovery_ended = true;
1990 break;
1991 }
1992
1993 /* Bail out after recovery_timeout seconds if this option is set */
1994 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1995 {
1998 "recovery timed out");
2000 }
2001
2002 /* Keep waiting */
2005 }
2006
2007 disconnect_database(conn, false);
2008
2009 if (!ready)
2010 report_createsub_fatal("server did not end recovery");
2011
2013 "target server reached the consistent state");
2015 "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
2016}
#define USECS_PER_SEC
Definition timestamp.h:134
#define WAIT_INTERVAL
void pg_usleep(long microsec)
Definition signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, fb(), PG_LOG_ERROR, PG_LOG_HINT, PG_LOG_INFO, PG_LOG_PRIMARY, pg_usleep(), recovery_ended, CreateSubscriberOptions::recovery_timeout, report_createsub_fatal(), report_createsub_log(), server_is_in_recovery(), stop_standby_server(), subscriber_dir, USECS_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfos

◆ dry_run

◆ internal_log_file_fp

FILE* internal_log_file_fp = NULL
static

Definition at line 187 of file pg_createsubscriber.c.

Referenced by internal_log_file_write(), main(), and report_createsub_log_v().

◆ logdir

char logdir[MAXPGPATH]
static

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 178 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 180 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 179 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 184 of file pg_createsubscriber.c.

Referenced by main(), main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 185 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

Definition at line 171 of file pg_createsubscriber.c.

Referenced by check_subscriber(), drop_primary_replication_slot(), and main().

◆ prng_state

pg_prng_state prng_state
static

Definition at line 182 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 169 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 195 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ recovery_params_set

bool recovery_params_set = false
static

Definition at line 197 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and setup_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

◆ success

bool success = false
static

Definition at line 174 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().