PostgreSQL Source Code git master
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/wait.h>
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/file_utils.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "datatype/timestamp.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "fe_utils/version.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 
struct  LogicalRepInfos
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define OBJECTTYPE_PUBLICATIONS   0x0001
 
#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"
 
#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"
 
#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"
 
#define WAIT_INTERVAL   1 /* 1 second */
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage (void)
 
static char * get_base_conninfo (const char *conninfo, char **dbname)
 
static char * get_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static char * get_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static char * concat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static char * generate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static char * setup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
 
static char * create_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static bool find_publication (PGconn *conn, const char *pubname, const char *dbname)
 
static void drop_publication (PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
 
static void check_and_drop_publications (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void drop_existing_subscription (PGconn *conn, const char *subname, const char *dbname)
 
static void get_publisher_databases (struct CreateSubscriberOptions *opt, bool dbnamespecified)
 
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
 
int main (int argc, char **argv)
 

Variables

static const char * progname
 
static char * primary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfos dbinfos
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static char * pg_ctl_path = NULL
 
static char * pg_resetwal_path = NULL
 
static char * subscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 
static bool recovery_params_set = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 34 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE

#define INCLUDED_CONF_FILE   "pg_createsubscriber.conf"

Definition at line 49 of file pg_createsubscriber.c.

◆ INCLUDED_CONF_FILE_DISABLED

#define INCLUDED_CONF_FILE_DISABLED   INCLUDED_CONF_FILE ".disabled"

Definition at line 50 of file pg_createsubscriber.c.

◆ OBJECTTYPE_PUBLICATIONS

#define OBJECTTYPE_PUBLICATIONS   0x0001

Definition at line 35 of file pg_createsubscriber.c.

◆ PG_AUTOCONF_FILENAME

#define PG_AUTOCONF_FILENAME   "postgresql.auto.conf"

Definition at line 48 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 150 of file pg_createsubscriber.c.

Function Documentation

◆ appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer  buf,
const char *  keyword,
const char *  val 
)
static

Definition at line 312 of file pg_createsubscriber.c.

313{
314 if (buf->len > 0)
316 appendPQExpBufferStr(buf, keyword);
319}
long val
Definition: informix.c:689
static char buf[DEFAULT_XLOG_SEG_SIZE]
Definition: pg_test_fsync.c:71
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:698

References appendConnStrVal(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and val.

Referenced by concat_conninfo_dbname(), get_base_conninfo(), and get_sub_conninfo().

◆ check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1244 of file pg_createsubscriber.c.

1246{
1248 char *dbname;
1249 PGresult *res;
1250
1251 Assert(conn != NULL);
1252
1253 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1254
1255 appendPQExpBuffer(query,
1256 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1257 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1258 "WHERE d.datname = %s",
1259 dbname);
1260 res = PQexec(conn, query->data);
1261
1262 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1263 {
1264 pg_log_error("could not obtain pre-existing subscriptions: %s",
1267 }
1268
1269 for (int i = 0; i < PQntuples(res); i++)
1271 dbinfo->dbname);
1272
1273 PQclear(res);
1274 destroyPQExpBuffer(query);
1276}
void PQfreemem(void *ptr)
Definition: fe-exec.c:4049
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4399
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2279
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultStatus
Definition: libpq-be-fe.h:247
#define PQntuples
Definition: libpq-be-fe.h:251
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
#define pg_log_error(...)
Definition: logging.h:106
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static void disconnect_database(PGconn *conn, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscription(), i, pg_log_error, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by setup_subscriber().

◆ check_and_drop_publications()

static void check_and_drop_publications ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1866 of file pg_createsubscriber.c.

1867{
1868 PGresult *res;
1870
1871 Assert(conn != NULL);
1872
1873 if (drop_all_pubs)
1874 {
1875 pg_log_info("dropping all existing publications in database \"%s\"",
1876 dbinfo->dbname);
1877
1878 /* Fetch all publication names */
1879 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1880 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1881 {
1882 pg_log_error("could not obtain publication information: %s",
1884 PQclear(res);
1886 }
1887
1888 /* Drop each publication */
1889 for (int i = 0; i < PQntuples(res); i++)
1890 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1891 &dbinfo->made_publication);
1892
1893 PQclear(res);
1894 }
1895 else
1896 {
1897 /* Drop publication only if it was created by this tool */
1898 if (dbinfo->made_publication)
1899 {
1900 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1901 &dbinfo->made_publication);
1902 }
1903 else
1904 {
1905 if (dry_run)
1906 pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
1907 dbinfo->pubname, dbinfo->dbname);
1908 else
1909 pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
1910 dbinfo->pubname, dbinfo->dbname);
1911 }
1912 }
1913}
#define pg_log_info(...)
Definition: logging.h:124
static struct LogicalRepInfos dbinfos
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static bool dry_run
#define OBJECTTYPE_PUBLICATIONS

References Assert(), conn, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), dry_run, i, LogicalRepInfo::made_publication, OBJECTTYPE_PUBLICATIONS, LogicalRepInfos::objecttypes_to_clean, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, and LogicalRepInfo::pubname.

Referenced by setup_subscriber().

◆ check_data_directory()

static void check_data_directory ( const char *  datadir)
static

Definition at line 440 of file pg_createsubscriber.c.

441{
442 struct stat statbuf;
443 uint32 major_version;
444 char *version_str;
445
446 pg_log_info("checking if directory \"%s\" is a cluster data directory",
447 datadir);
448
449 if (stat(datadir, &statbuf) != 0)
450 {
451 if (errno == ENOENT)
452 pg_fatal("data directory \"%s\" does not exist", datadir);
453 else
454 pg_fatal("could not access directory \"%s\": %m", datadir);
455 }
456
457 /*
458 * Retrieve the contents of this cluster's PG_VERSION. We require
459 * compatibility with the same major version as the one this tool is
460 * compiled with.
461 */
462 major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
463 if (major_version != PG_MAJORVERSION_NUM)
464 {
465 pg_log_error("data directory is of wrong version");
466 pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
467 "PG_VERSION", version_str, PG_MAJORVERSION);
468 exit(1);
469 }
470}
uint32_t uint32
Definition: c.h:552
uint32 get_pg_version(const char *datadir, char **version_str)
Definition: version.c:44
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_fatal(...)
char * datadir
#define GET_PG_MAJORVERSION_NUM(v)
Definition: version.h:19
#define stat
Definition: win32_port.h:274

References datadir, GET_PG_MAJORVERSION_NUM, get_pg_version(), pg_fatal, pg_log_error, pg_log_error_detail, pg_log_info, and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 960 of file pg_createsubscriber.c.

961{
962 PGconn *conn;
963 PGresult *res;
964 bool failed = false;
965
966 char *wal_level;
967 int max_repslots;
968 int cur_repslots;
969 int max_walsenders;
970 int cur_walsenders;
971 int max_prepared_transactions;
972 char *max_slot_wal_keep_size;
973
974 pg_log_info("checking settings on publisher");
975
976 conn = connect_database(dbinfo[0].pubconninfo, true);
977
978 /*
979 * If the primary server is in recovery (i.e. cascading replication),
980 * objects (publication) cannot be created because it is read only.
981 */
983 {
984 pg_log_error("primary server cannot be in recovery");
986 }
987
988 /*------------------------------------------------------------------------
989 * Logical replication requires a few parameters to be set on publisher.
990 * Since these parameters are not a requirement for physical replication,
991 * we should check it to make sure it won't fail.
992 *
993 * - wal_level >= replica
994 * - max_replication_slots >= current + number of dbs to be converted
995 * - max_wal_senders >= current + number of dbs to be converted
996 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
997 * -----------------------------------------------------------------------
998 */
999 res = PQexec(conn,
1000 "SELECT pg_catalog.current_setting('wal_level'),"
1001 " pg_catalog.current_setting('max_replication_slots'),"
1002 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
1003 " pg_catalog.current_setting('max_wal_senders'),"
1004 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
1005 " pg_catalog.current_setting('max_prepared_transactions'),"
1006 " pg_catalog.current_setting('max_slot_wal_keep_size')");
1007
1008 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1009 {
1010 pg_log_error("could not obtain publisher settings: %s",
1013 }
1014
1015 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
1016 max_repslots = atoi(PQgetvalue(res, 0, 1));
1017 cur_repslots = atoi(PQgetvalue(res, 0, 2));
1018 max_walsenders = atoi(PQgetvalue(res, 0, 3));
1019 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
1020 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
1021 max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
1022
1023 PQclear(res);
1024
1025 pg_log_debug("publisher: wal_level: %s", wal_level);
1026 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
1027 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
1028 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
1029 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
1030 pg_log_debug("publisher: max_prepared_transactions: %d",
1031 max_prepared_transactions);
1032 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
1033 max_slot_wal_keep_size);
1034
1035 disconnect_database(conn, false);
1036
1037 if (strcmp(wal_level, "minimal") == 0)
1038 {
1039 pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
1040 failed = true;
1041 }
1042
1043 if (max_repslots - cur_repslots < num_dbs)
1044 {
1045 pg_log_error("publisher requires %d replication slots, but only %d remain",
1046 num_dbs, max_repslots - cur_repslots);
1047 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1048 "max_replication_slots", cur_repslots + num_dbs);
1049 failed = true;
1050 }
1051
1052 if (max_walsenders - cur_walsenders < num_dbs)
1053 {
1054 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
1055 num_dbs, max_walsenders - cur_walsenders);
1056 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1057 "max_wal_senders", cur_walsenders + num_dbs);
1058 failed = true;
1059 }
1060
1061 if (max_prepared_transactions != 0 && !dbinfos.two_phase)
1062 {
1063 pg_log_warning("two_phase option will not be enabled for replication slots");
1064 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
1065 "Prepared transactions will be replicated at COMMIT PREPARED.");
1066 pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
1067 }
1068
1069 /*
1070 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
1071 * is set to a non-default value, it may cause replication failures due to
1072 * required WAL files being prematurely removed.
1073 */
1074 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
1075 {
1076 pg_log_warning("required WAL could be removed from the publisher");
1077 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
1078 "max_slot_wal_keep_size");
1079 }
1080
1082
1083 if (failed)
1084 exit(1);
1085}
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_debug(...)
Definition: logging.h:133
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
#define pg_log_warning(...)
Definition: pgfnames.c:24
int wal_level
Definition: xlog.c:134

References conn, connect_database(), dbinfos, disconnect_database(), dry_run, num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_log_warning, pg_log_warning_detail, pg_log_warning_hint, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, server_is_in_recovery(), LogicalRepInfos::two_phase, and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1099 of file pg_createsubscriber.c.

1100{
1101 PGconn *conn;
1102 PGresult *res;
1103 bool failed = false;
1104
1105 int max_lrworkers;
1106 int max_reporigins;
1107 int max_wprocs;
1108
1109 pg_log_info("checking settings on subscriber");
1110
1111 conn = connect_database(dbinfo[0].subconninfo, true);
1112
1113 /* The target server must be a standby */
1115 {
1116 pg_log_error("target server must be a standby");
1118 }
1119
1120 /*------------------------------------------------------------------------
1121 * Logical replication requires a few parameters to be set on subscriber.
1122 * Since these parameters are not a requirement for physical replication,
1123 * we should check it to make sure it won't fail.
1124 *
1125 * - max_active_replication_origins >= number of dbs to be converted
1126 * - max_logical_replication_workers >= number of dbs to be converted
1127 * - max_worker_processes >= 1 + number of dbs to be converted
1128 *------------------------------------------------------------------------
1129 */
1130 res = PQexec(conn,
1131 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1132 "'max_logical_replication_workers', "
1133 "'max_active_replication_origins', "
1134 "'max_worker_processes', "
1135 "'primary_slot_name') "
1136 "ORDER BY name");
1137
1138 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1139 {
1140 pg_log_error("could not obtain subscriber settings: %s",
1143 }
1144
1145 max_reporigins = atoi(PQgetvalue(res, 0, 0));
1146 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1147 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1148 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1150
1151 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1152 max_lrworkers);
1153 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1154 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1156 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1157
1158 PQclear(res);
1159
1160 disconnect_database(conn, false);
1161
1162 if (max_reporigins < num_dbs)
1163 {
1164 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1165 num_dbs, max_reporigins);
1166 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1167 "max_active_replication_origins", num_dbs);
1168 failed = true;
1169 }
1170
1171 if (max_lrworkers < num_dbs)
1172 {
1173 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1174 num_dbs, max_lrworkers);
1175 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1176 "max_logical_replication_workers", num_dbs);
1177 failed = true;
1178 }
1179
1180 if (max_wprocs < num_dbs + 1)
1181 {
1182 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1183 num_dbs + 1, max_wprocs);
1184 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1185 "max_worker_processes", num_dbs + 1);
1186 failed = true;
1187 }
1188
1189 if (failed)
1190 exit(1);
1191}
static char * primary_slot_name

References conn, connect_database(), disconnect_database(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, primary_slot_name, and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 192 of file pg_createsubscriber.c.

193{
194 /* Rename the included configuration file, if necessary. */
196 {
197 char conf_filename[MAXPGPATH];
198 char conf_filename_disabled[MAXPGPATH];
199
200 snprintf(conf_filename, MAXPGPATH, "%s/%s", subscriber_dir,
202 snprintf(conf_filename_disabled, MAXPGPATH, "%s/%s", subscriber_dir,
204
205 if (durable_rename(conf_filename, conf_filename_disabled) != 0)
206 {
207 /* durable_rename() has already logged something. */
208 pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
209 }
210 }
211
212 if (success)
213 return;
214
215 /*
216 * If the server is promoted, there is no way to use the current setup
217 * again. Warn the user that a new replication setup should be done before
218 * trying again.
219 */
220 if (recovery_ended)
221 {
222 pg_log_warning("failed after the end of recovery");
223 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
224 "You must recreate the physical replica before continuing.");
225 }
226
227 for (int i = 0; i < num_dbs; i++)
228 {
229 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
230
231 if (dbinfo->made_publication || dbinfo->made_replslot)
232 {
233 PGconn *conn;
234
235 conn = connect_database(dbinfo->pubconninfo, false);
236 if (conn != NULL)
237 {
238 if (dbinfo->made_publication)
239 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
240 &dbinfo->made_publication);
241 if (dbinfo->made_replslot)
242 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
244 }
245 else
246 {
247 /*
248 * If a connection could not be established, inform the user
249 * that some objects were left on primary and should be
250 * removed before trying again.
251 */
252 if (dbinfo->made_publication)
253 {
254 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
255 dbinfo->pubname,
256 dbinfo->dbname);
257 pg_log_warning_hint("Drop this publication before trying again.");
258 }
259 if (dbinfo->made_replslot)
260 {
261 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
262 dbinfo->replslotname,
263 dbinfo->dbname);
264 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
265 }
266 }
267 }
268 }
269
270 if (standby_running)
272}
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:779
#define MAXPGPATH
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
#define INCLUDED_CONF_FILE
static bool success
static bool recovery_ended
#define INCLUDED_CONF_FILE_DISABLED
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
static bool recovery_params_set
#define snprintf
Definition: port.h:260
struct LogicalRepInfo * dbinfo

References conn, connect_database(), LogicalRepInfos::dbinfo, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), drop_replication_slot(), durable_rename(), i, INCLUDED_CONF_FILE, INCLUDED_CONF_FILE_DISABLED, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, MAXPGPATH, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, recovery_params_set, LogicalRepInfo::replslotname, snprintf, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char *  conninfo,
const char *  dbname 
)
static

Definition at line 480 of file pg_createsubscriber.c.

481{
483 char *ret;
484
485 Assert(conninfo != NULL);
486
487 appendPQExpBufferStr(buf, conninfo);
488 appendConnStrItem(buf, "dbname", dbname);
489
490 ret = pg_strdup(buf->data);
492
493 return ret;
494}
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), and pg_strdup().

Referenced by get_publisher_databases(), and store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char *  conninfo,
bool  exit_on_error 
)
static

Definition at line 577 of file pg_createsubscriber.c.

578{
579 PGconn *conn;
580 PGresult *res;
581
582 conn = PQconnectdb(conninfo);
584 {
585 pg_log_error("connection to database failed: %s",
587 PQfinish(conn);
588
589 if (exit_on_error)
590 exit(1);
591 return NULL;
592 }
593
594 /* Secure search_path */
597 {
598 pg_log_error("could not clear \"search_path\": %s",
600 PQclear(res);
601 PQfinish(conn);
602
603 if (exit_on_error)
604 exit(1);
605 return NULL;
606 }
607 PQclear(res);
608
609 return conn;
610}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:825
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7641
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5316
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
@ CONNECTION_OK
Definition: libpq-fe.h:84

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, pg_log_error, PGRES_TUPLES_OK, PQclear, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage, PQresultStatus, and PQstatus().

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), get_publisher_databases(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1486 of file pg_createsubscriber.c.

1487{
1489 PGresult *res = NULL;
1490 const char *slot_name = dbinfo->replslotname;
1491 char *slot_name_esc;
1492 char *lsn = NULL;
1493
1494 Assert(conn != NULL);
1495
1496 if (dry_run)
1497 pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1498 slot_name, dbinfo->dbname);
1499 else
1500 pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1501 slot_name, dbinfo->dbname);
1502
1503 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1504
1506 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1507 slot_name_esc,
1508 dbinfos.two_phase ? "true" : "false");
1509
1510 PQfreemem(slot_name_esc);
1511
1512 pg_log_debug("command is: %s", str->data);
1513
1514 if (!dry_run)
1515 {
1516 res = PQexec(conn, str->data);
1517 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1518 {
1519 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1520 slot_name, dbinfo->dbname,
1522 PQclear(res);
1524 return NULL;
1525 }
1526
1527 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1528 PQclear(res);
1529 }
1530
1531 /* For cleanup purposes */
1532 dbinfo->made_replslot = true;
1533
1535
1536 return lsn;
1537}
const char * str

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::replslotname, str, and LogicalRepInfos::two_phase.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1732 of file pg_createsubscriber.c.

1733{
1735 PGresult *res;
1736 char *ipubname_esc;
1737 char *spubname_esc;
1738
1739 Assert(conn != NULL);
1740
1741 ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1742 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1743
1744 /* Check if the publication already exists */
1746 "SELECT 1 FROM pg_catalog.pg_publication "
1747 "WHERE pubname = %s",
1748 spubname_esc);
1749 res = PQexec(conn, str->data);
1750 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1751 {
1752 pg_log_error("could not obtain publication information: %s",
1755 }
1756
1757 if (PQntuples(res) == 1)
1758 {
1759 /*
1760 * Unfortunately, if it reaches this code path, it will always fail
1761 * (unless you decide to change the existing publication name). That's
1762 * bad but it is very unlikely that the user will choose a name with
1763 * pg_createsubscriber_ prefix followed by the exact database oid and
1764 * a random number.
1765 */
1766 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1767 pg_log_error_hint("Consider renaming this publication before continuing.");
1769 }
1770
1771 PQclear(res);
1773
1774 if (dry_run)
1775 pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1776 dbinfo->pubname, dbinfo->dbname);
1777 else
1778 pg_log_info("creating publication \"%s\" in database \"%s\"",
1779 dbinfo->pubname, dbinfo->dbname);
1780
1781 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1782 ipubname_esc);
1783
1784 pg_log_debug("command is: %s", str->data);
1785
1786 if (!dry_run)
1787 {
1788 res = PQexec(conn, str->data);
1789 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1790 {
1791 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1792 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1794 }
1795 PQclear(res);
1796 }
1797
1798 /* For cleanup purposes */
1799 dbinfo->made_publication = true;
1800
1801 PQfreemem(ipubname_esc);
1802 PQfreemem(spubname_esc);
1804}
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4405
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, LogicalRepInfo::made_publication, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubname, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1927 of file pg_createsubscriber.c.

1928{
1930 PGresult *res;
1931 char *pubname_esc;
1932 char *subname_esc;
1933 char *pubconninfo_esc;
1934 char *replslotname_esc;
1935
1936 Assert(conn != NULL);
1937
1938 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1939 subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1940 pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1941 replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1942
1943 if (dry_run)
1944 pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
1945 dbinfo->subname, dbinfo->dbname);
1946 else
1947 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1948 dbinfo->subname, dbinfo->dbname);
1949
1951 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1952 "WITH (create_slot = false, enabled = false, "
1953 "slot_name = %s, copy_data = false, two_phase = %s)",
1954 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1955 dbinfos.two_phase ? "true" : "false");
1956
1957 PQfreemem(pubname_esc);
1958 PQfreemem(subname_esc);
1959 PQfreemem(pubconninfo_esc);
1960 PQfreemem(replslotname_esc);
1961
1962 pg_log_debug("command is: %s", str->data);
1963
1964 if (!dry_run)
1965 {
1966 res = PQexec(conn, str->data);
1967 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1968 {
1969 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1970 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1972 }
1973 PQclear(res);
1974 }
1975
1977}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, str, LogicalRepInfo::subname, and LogicalRepInfos::two_phase.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_existing_subscription()

static void drop_existing_subscription ( PGconn conn,
const char *  subname,
const char *  dbname 
)
static

Definition at line 1200 of file pg_createsubscriber.c.

1201{
1203 PGresult *res;
1204
1205 Assert(conn != NULL);
1206
1207 /*
1208 * Construct a query string. These commands are allowed to be executed
1209 * within a transaction.
1210 */
1211 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1212 subname);
1213 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1214 subname);
1215 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1216
1217 if (dry_run)
1218 pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1219 subname, dbname);
1220 else
1221 {
1222 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1223 subname, dbname);
1224
1225 res = PQexec(conn, query->data);
1226
1227 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1228 {
1229 pg_log_error("could not drop subscription \"%s\": %s",
1232 }
1233
1234 PQclear(res);
1235 }
1236
1237 destroyPQExpBuffer(query);
1238}
NameData subname

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, and subname.

Referenced by check_and_drop_existing_subscriptions().

◆ drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo dbinfo)
static

Definition at line 1444 of file pg_createsubscriber.c.

1445{
1446 PGconn *conn;
1447 PGresult *res;
1448
1449 conn = connect_database(dbinfo[0].subconninfo, false);
1450 if (conn != NULL)
1451 {
1452 /* Get failover replication slot names */
1453 res = PQexec(conn,
1454 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1455
1456 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1457 {
1458 /* Remove failover replication slots from subscriber */
1459 for (int i = 0; i < PQntuples(res); i++)
1460 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1461 }
1462 else
1463 {
1464 pg_log_warning("could not obtain failover replication slot information: %s",
1466 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1467 }
1468
1469 PQclear(res);
1470 disconnect_database(conn, false);
1471 }
1472 else
1473 {
1474 pg_log_warning("could not drop failover replication slot");
1475 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1476 }
1477}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), i, pg_log_warning, pg_log_warning_hint, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char *  slotname 
)
static

Definition at line 1414 of file pg_createsubscriber.c.

1415{
1416 PGconn *conn;
1417
1418 /* Replication slot does not exist, do nothing */
1419 if (!primary_slot_name)
1420 return;
1421
1422 conn = connect_database(dbinfo[0].pubconninfo, false);
1423 if (conn != NULL)
1424 {
1425 drop_replication_slot(conn, &dbinfo[0], slotname);
1426 disconnect_database(conn, false);
1427 }
1428 else
1429 {
1430 pg_log_warning("could not drop replication slot \"%s\" on primary",
1431 slotname);
1432 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1433 }
1434}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), pg_log_warning, pg_log_warning_hint, and primary_slot_name.

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
const char *  pubname,
const char *  dbname,
bool *  made_publication 
)
static

Definition at line 1810 of file pg_createsubscriber.c.

1812{
1814 PGresult *res;
1815 char *pubname_esc;
1816
1817 Assert(conn != NULL);
1818
1819 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1820
1821 if (dry_run)
1822 pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1823 pubname, dbname);
1824 else
1825 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1826 pubname, dbname);
1827
1828 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1829
1830 PQfreemem(pubname_esc);
1831
1832 pg_log_debug("command is: %s", str->data);
1833
1834 if (!dry_run)
1835 {
1836 res = PQexec(conn, str->data);
1837 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1838 {
1839 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1840 pubname, dbname, PQresultErrorMessage(res));
1841 *made_publication = false; /* don't try again. */
1842
1843 /*
1844 * Don't disconnect and exit here. This routine is used by primary
1845 * (cleanup publication / replication slot due to an error) and
1846 * subscriber (remove the replicated publications). In both cases,
1847 * it can continue and provide instructions for the user to remove
1848 * it later if cleanup fails.
1849 */
1850 }
1851 PQclear(res);
1852 }
1853
1855}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by check_and_drop_publications(), and cleanup_objects_atexit().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char *  slot_name 
)
static

Definition at line 1540 of file pg_createsubscriber.c.

1542{
1544 char *slot_name_esc;
1545 PGresult *res;
1546
1547 Assert(conn != NULL);
1548
1549 if (dry_run)
1550 pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1551 slot_name, dbinfo->dbname);
1552 else
1553 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1554 slot_name, dbinfo->dbname);
1555
1556 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1557
1558 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1559
1560 PQfreemem(slot_name_esc);
1561
1562 pg_log_debug("command is: %s", str->data);
1563
1564 if (!dry_run)
1565 {
1566 res = PQexec(conn, str->data);
1567 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1568 {
1569 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1570 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1571 dbinfo->made_replslot = false; /* don't try again. */
1572 }
1573
1574 PQclear(res);
1575 }
1576
1578}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 2085 of file pg_createsubscriber.c.

2086{
2088 PGresult *res;
2089 char *subname;
2090
2091 Assert(conn != NULL);
2092
2093 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
2094
2095 if (dry_run)
2096 pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
2097 dbinfo->subname, dbinfo->dbname);
2098 else
2099 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
2100 dbinfo->subname, dbinfo->dbname);
2101
2102 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
2103
2104 pg_log_debug("command is: %s", str->data);
2105
2106 if (!dry_run)
2107 {
2108 res = PQexec(conn, str->data);
2109 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2110 {
2111 pg_log_error("could not enable subscription \"%s\": %s",
2112 dbinfo->subname, PQresultErrorMessage(res));
2114 }
2115
2116 PQclear(res);
2117 }
2118
2121}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ find_publication()

static bool find_publication ( PGconn conn,
const char *  pubname,
const char *  dbname 
)
static

Definition at line 808 of file pg_createsubscriber.c.

809{
811 PGresult *res;
812 bool found = false;
813 char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
814
816 "SELECT 1 FROM pg_catalog.pg_publication "
817 "WHERE pubname = %s",
818 pubname_esc);
819 res = PQexec(conn, str->data);
821 {
822 pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
823 pubname, dbname, PQerrorMessage(conn));
825 }
826
827 if (PQntuples(res) == 1)
828 found = true;
829
830 PQclear(res);
831 PQfreemem(pubname_esc);
833
834 return found;
835}

References appendPQExpBuffer(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), disconnect_database(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultStatus, and str.

Referenced by setup_publisher().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 762 of file pg_createsubscriber.c.

763{
764 PGresult *res;
765 Oid oid;
766 uint32 rand;
767 char *objname;
768
769 res = PQexec(conn,
770 "SELECT oid FROM pg_catalog.pg_database "
771 "WHERE datname = pg_catalog.current_database()");
773 {
774 pg_log_error("could not obtain database OID: %s",
777 }
778
779 if (PQntuples(res) != 1)
780 {
781 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
782 PQntuples(res), 1);
784 }
785
786 /* Database OID */
787 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
788
789 PQclear(res);
790
791 /* Random unsigned integer */
792 rand = pg_prng_uint32(&prng_state);
793
794 /*
795 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
796 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
797 * '\0').
798 */
799 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
800
801 return objname;
802}
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
unsigned int Oid
Definition: postgres_ext.h:32
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43

References conn, disconnect_database(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, prng_state, and psprintf().

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char *  conninfo,
char **  dbname 
)
static

Definition at line 334 of file pg_createsubscriber.c.

335{
337 PQconninfoOption *conn_opts;
338 PQconninfoOption *conn_opt;
339 char *errmsg = NULL;
340 char *ret;
341
342 conn_opts = PQconninfoParse(conninfo, &errmsg);
343 if (conn_opts == NULL)
344 {
345 pg_log_error("could not parse connection string: %s", errmsg);
347 return NULL;
348 }
349
351 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
352 {
353 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
354 {
355 if (strcmp(conn_opt->keyword, "dbname") == 0)
356 {
357 if (dbname)
358 *dbname = pg_strdup(conn_opt->val);
359 continue;
360 }
361 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
362 }
363 }
364
365 ret = pg_strdup(buf->data);
366
368 PQconninfoFree(conn_opts);
369
370 return ret;
371}
int errmsg(const char *fmt,...)
Definition: elog.c:1080
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7525
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6241

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg(), _PQconninfoOption::keyword, pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and _PQconninfoOption::val.

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char *  argv0,
const char *  progname 
)
static

Definition at line 404 of file pg_createsubscriber.c.

405{
406 char *versionstr;
407 char *exec_path;
408 int ret;
409
410 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
412 ret = find_other_exec(argv0, progname, versionstr, exec_path);
413
414 if (ret < 0)
415 {
416 char full_path[MAXPGPATH];
417
418 if (find_my_exec(argv0, full_path) < 0)
419 strlcpy(full_path, progname, sizeof(full_path));
420
421 if (ret == -1)
422 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
423 progname, "pg_createsubscriber", full_path);
424 else
425 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
426 progname, full_path, "pg_createsubscriber");
427 }
428
429 pg_log_debug("%s path is: %s", progname, exec_path);
430
431 return exec_path;
432}
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:161
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:311
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static const char * progname
static char * argv0
Definition: pg_ctl.c:94
static char * exec_path
Definition: pg_ctl.c:89
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45

References argv0, exec_path, find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char *  conninfo)
static

Definition at line 632 of file pg_createsubscriber.c.

633{
634 PGconn *conn;
635 PGresult *res;
636 uint64 sysid;
637
638 pg_log_info("getting system identifier from publisher");
639
640 conn = connect_database(conninfo, true);
641
642 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
644 {
645 pg_log_error("could not get system identifier: %s",
648 }
649 if (PQntuples(res) != 1)
650 {
651 pg_log_error("could not get system identifier: got %d rows, expected %d row",
652 PQntuples(res), 1);
654 }
655
656 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
657
658 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
659
660 PQclear(res);
662
663 return sysid;
664}
uint64_t uint64
Definition: c.h:553

References conn, connect_database(), disconnect_database(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ get_publisher_databases()

static void get_publisher_databases ( struct CreateSubscriberOptions opt,
bool  dbnamespecified 
)
static

Definition at line 2129 of file pg_createsubscriber.c.

2131{
2132 PGconn *conn;
2133 PGresult *res;
2134
2135 /* If a database name was specified, just connect to it. */
2136 if (dbnamespecified)
2138 else
2139 {
2140 /* Otherwise, try postgres first and then template1. */
2141 char *conninfo;
2142
2143 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2144 conn = connect_database(conninfo, false);
2145 pg_free(conninfo);
2146 if (!conn)
2147 {
2148 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2149 conn = connect_database(conninfo, true);
2150 pg_free(conninfo);
2151 }
2152 }
2153
2154 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2155 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2156 {
2157 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2158 PQclear(res);
2160 }
2161
2162 for (int i = 0; i < PQntuples(res); i++)
2163 {
2164 const char *dbname = PQgetvalue(res, i, 0);
2165
2167
2168 /* Increment num_dbs to reflect multiple --database options */
2169 num_dbs++;
2170 }
2171
2172 PQclear(res);
2173 disconnect_database(conn, false);
2174}
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
SimpleStringList database_names

References concat_conninfo_dbname(), conn, connect_database(), CreateSubscriberOptions::database_names, dbname, disconnect_database(), i, num_dbs, pg_free(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, CreateSubscriberOptions::pub_conninfo_str, and simple_string_list_append().

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char *  datadir)
static

Definition at line 672 of file pg_createsubscriber.c.

673{
674 ControlFileData *cf;
675 bool crc_ok;
676 uint64 sysid;
677
678 pg_log_info("getting system identifier from subscriber");
679
680 cf = get_controlfile(datadir, &crc_ok);
681 if (!crc_ok)
682 pg_fatal("control file appears to be corrupt");
683
684 sysid = cf->system_identifier;
685
686 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
687
688 pg_free(cf);
689
690 return sysid;
691}
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
uint64 system_identifier
Definition: pg_control.h:112

References datadir, get_controlfile(), pg_fatal, pg_free(), pg_log_info, and ControlFileData::system_identifier.

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 378 of file pg_createsubscriber.c.

379{
381 char *ret;
382
383 appendConnStrItem(buf, "port", opt->sub_port);
384#if !defined(WIN32)
385 appendConnStrItem(buf, "host", opt->socket_dir);
386#endif
387 if (opt->sub_username != NULL)
388 appendConnStrItem(buf, "user", opt->sub_username);
389 appendConnStrItem(buf, "fallback_application_name", progname);
390
391 ret = pg_strdup(buf->data);
392
394
395 return ret;
396}

References appendConnStrItem(), buf, createPQExpBuffer(), destroyPQExpBuffer(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2177 of file pg_createsubscriber.c.

2178{
2179 static struct option long_options[] =
2180 {
2181 {"all", no_argument, NULL, 'a'},
2182 {"database", required_argument, NULL, 'd'},
2183 {"pgdata", required_argument, NULL, 'D'},
2184 {"dry-run", no_argument, NULL, 'n'},
2185 {"subscriber-port", required_argument, NULL, 'p'},
2186 {"publisher-server", required_argument, NULL, 'P'},
2187 {"socketdir", required_argument, NULL, 's'},
2188 {"recovery-timeout", required_argument, NULL, 't'},
2189 {"enable-two-phase", no_argument, NULL, 'T'},
2190 {"subscriber-username", required_argument, NULL, 'U'},
2191 {"verbose", no_argument, NULL, 'v'},
2192 {"version", no_argument, NULL, 'V'},
2193 {"help", no_argument, NULL, '?'},
2194 {"config-file", required_argument, NULL, 1},
2195 {"publication", required_argument, NULL, 2},
2196 {"replication-slot", required_argument, NULL, 3},
2197 {"subscription", required_argument, NULL, 4},
2198 {"clean", required_argument, NULL, 5},
2199 {NULL, 0, NULL, 0}
2200 };
2201
2202 struct CreateSubscriberOptions opt = {0};
2203
2204 int c;
2205 int option_index;
2206
2207 char *pub_base_conninfo;
2208 char *sub_base_conninfo;
2209 char *dbname_conninfo = NULL;
2210
2211 uint64 pub_sysid;
2212 uint64 sub_sysid;
2213 struct stat statbuf;
2214
2215 char *consistent_lsn;
2216
2217 char pidfile[MAXPGPATH];
2218
2219 pg_logging_init(argv[0]);
2221 progname = get_progname(argv[0]);
2222 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2223
2224 if (argc > 1)
2225 {
2226 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2227 {
2228 usage();
2229 exit(0);
2230 }
2231 else if (strcmp(argv[1], "-V") == 0
2232 || strcmp(argv[1], "--version") == 0)
2233 {
2234 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2235 exit(0);
2236 }
2237 }
2238
2239 /* Default settings */
2240 subscriber_dir = NULL;
2241 opt.config_file = NULL;
2242 opt.pub_conninfo_str = NULL;
2243 opt.socket_dir = NULL;
2245 opt.sub_username = NULL;
2246 opt.two_phase = false;
2248 {
2249 0
2250 };
2251 opt.recovery_timeout = 0;
2252 opt.all_dbs = false;
2253
2254 /*
2255 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2256 * it either.
2257 */
2258#ifndef WIN32
2259 if (geteuid() == 0)
2260 {
2261 pg_log_error("cannot be executed by \"root\"");
2262 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2263 progname);
2264 exit(1);
2265 }
2266#endif
2267
2269
2270 while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2271 long_options, &option_index)) != -1)
2272 {
2273 switch (c)
2274 {
2275 case 'a':
2276 opt.all_dbs = true;
2277 break;
2278 case 'd':
2280 {
2282 num_dbs++;
2283 }
2284 else
2285 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2286 break;
2287 case 'D':
2290 break;
2291 case 'n':
2292 dry_run = true;
2293 break;
2294 case 'p':
2295 opt.sub_port = pg_strdup(optarg);
2296 break;
2297 case 'P':
2299 break;
2300 case 's':
2303 break;
2304 case 't':
2305 opt.recovery_timeout = atoi(optarg);
2306 break;
2307 case 'T':
2308 opt.two_phase = true;
2309 break;
2310 case 'U':
2312 break;
2313 case 'v':
2315 break;
2316 case 1:
2318 break;
2319 case 2:
2321 {
2323 num_pubs++;
2324 }
2325 else
2326 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2327 break;
2328 case 3:
2330 {
2332 num_replslots++;
2333 }
2334 else
2335 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2336 break;
2337 case 4:
2339 {
2341 num_subs++;
2342 }
2343 else
2344 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2345 break;
2346 case 5:
2349 else
2350 pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2351 break;
2352 default:
2353 /* getopt_long already emitted a complaint */
2354 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2355 exit(1);
2356 }
2357 }
2358
2359 /* Validate that --all is not used with incompatible options */
2360 if (opt.all_dbs)
2361 {
2362 char *bad_switch = NULL;
2363
2364 if (num_dbs > 0)
2365 bad_switch = "--database";
2366 else if (num_pubs > 0)
2367 bad_switch = "--publication";
2368 else if (num_replslots > 0)
2369 bad_switch = "--replication-slot";
2370 else if (num_subs > 0)
2371 bad_switch = "--subscription";
2372
2373 if (bad_switch)
2374 {
2375 pg_log_error("options %s and %s cannot be used together",
2376 bad_switch, "-a/--all");
2377 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2378 exit(1);
2379 }
2380 }
2381
2382 /* Any non-option arguments? */
2383 if (optind < argc)
2384 {
2385 pg_log_error("too many command-line arguments (first is \"%s\")",
2386 argv[optind]);
2387 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2388 exit(1);
2389 }
2390
2391 /* Required arguments */
2392 if (subscriber_dir == NULL)
2393 {
2394 pg_log_error("no subscriber data directory specified");
2395 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2396 exit(1);
2397 }
2398
2399 /* If socket directory is not provided, use the current directory */
2400 if (opt.socket_dir == NULL)
2401 {
2402 char cwd[MAXPGPATH];
2403
2404 if (!getcwd(cwd, MAXPGPATH))
2405 pg_fatal("could not determine current directory");
2406 opt.socket_dir = pg_strdup(cwd);
2408 }
2409
2410 /*
2411 * Parse connection string. Build a base connection string that might be
2412 * reused by multiple databases.
2413 */
2414 if (opt.pub_conninfo_str == NULL)
2415 {
2416 /*
2417 * TODO use primary_conninfo (if available) from subscriber and
2418 * extract publisher connection string. Assume that there are
2419 * identical entries for physical and logical replication. If there is
2420 * not, we would fail anyway.
2421 */
2422 pg_log_error("no publisher connection string specified");
2423 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2424 exit(1);
2425 }
2426
2427 if (dry_run)
2428 pg_log_info("Executing in dry-run mode.\n"
2429 "The target directory will not be modified.");
2430
2431 pg_log_info("validating publisher connection string");
2432 pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2433 &dbname_conninfo);
2434 if (pub_base_conninfo == NULL)
2435 exit(1);
2436
2437 pg_log_info("validating subscriber connection string");
2438 sub_base_conninfo = get_sub_conninfo(&opt);
2439
2440 /*
2441 * Fetch all databases from the source (publisher) and treat them as if
2442 * the user specified has multiple --database options, one for each source
2443 * database.
2444 */
2445 if (opt.all_dbs)
2446 {
2447 bool dbnamespecified = (dbname_conninfo != NULL);
2448
2449 get_publisher_databases(&opt, dbnamespecified);
2450 }
2451
2452 if (opt.database_names.head == NULL)
2453 {
2454 pg_log_info("no database was specified");
2455
2456 /*
2457 * Try to obtain the dbname from the publisher conninfo. If dbname
2458 * parameter is not available, error out.
2459 */
2460 if (dbname_conninfo)
2461 {
2462 simple_string_list_append(&opt.database_names, dbname_conninfo);
2463 num_dbs++;
2464
2465 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2466 dbname_conninfo);
2467 }
2468 else
2469 {
2470 pg_log_error("no database name specified");
2471 pg_log_error_hint("Try \"%s --help\" for more information.",
2472 progname);
2473 exit(1);
2474 }
2475 }
2476
2477 /* Number of object names must match number of databases */
2478 if (num_pubs > 0 && num_pubs != num_dbs)
2479 {
2480 pg_log_error("wrong number of publication names specified");
2481 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2482 num_pubs, num_dbs);
2483 exit(1);
2484 }
2485 if (num_subs > 0 && num_subs != num_dbs)
2486 {
2487 pg_log_error("wrong number of subscription names specified");
2488 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2489 num_subs, num_dbs);
2490 exit(1);
2491 }
2492 if (num_replslots > 0 && num_replslots != num_dbs)
2493 {
2494 pg_log_error("wrong number of replication slot names specified");
2495 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2497 exit(1);
2498 }
2499
2500 /* Verify the object types specified for removal from the subscriber */
2501 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2502 {
2503 if (pg_strcasecmp(cell->val, "publications") == 0)
2505 else
2506 {
2507 pg_log_error("invalid object type \"%s\" specified for %s",
2508 cell->val, "--clean");
2509 pg_log_error_hint("The valid value is: \"%s\"", "publications");
2510 exit(1);
2511 }
2512 }
2513
2514 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2515 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2516 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2517
2518 /* Rudimentary check for a data directory */
2520
2522
2523 /*
2524 * Store database information for publisher and subscriber. It should be
2525 * called before atexit() because its return is used in the
2526 * cleanup_objects_atexit().
2527 */
2528 dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2529
2530 /* Register a function to clean up objects in case of failure */
2531 atexit(cleanup_objects_atexit);
2532
2533 /*
2534 * Check if the subscriber data directory has the same system identifier
2535 * than the publisher data directory.
2536 */
2538 sub_sysid = get_standby_sysid(subscriber_dir);
2539 if (pub_sysid != sub_sysid)
2540 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2541
2542 /* Subscriber PID file */
2543 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2544
2545 /*
2546 * The standby server must not be running. If the server is started under
2547 * service manager and pg_createsubscriber stops it, the service manager
2548 * might react to this action and start the server again. Therefore,
2549 * refuse to proceed if the server is running to avoid possible failures.
2550 */
2551 if (stat(pidfile, &statbuf) == 0)
2552 {
2553 pg_log_error("standby server is running");
2554 pg_log_error_hint("Stop the standby server and try again.");
2555 exit(1);
2556 }
2557
2558 /*
2559 * Start a short-lived standby server with temporary parameters (provided
2560 * by command-line options). The goal is to avoid connections during the
2561 * transformation steps.
2562 */
2563 pg_log_info("starting the standby server with command-line options");
2564 start_standby_server(&opt, true, false);
2565
2566 /* Check if the standby server is ready for logical replication */
2568
2569 /* Check if the primary server is ready for logical replication */
2571
2572 /*
2573 * Stop the target server. The recovery process requires that the server
2574 * reaches a consistent state before targeting the recovery stop point.
2575 * Make sure a consistent state is reached (stop the target server
2576 * guarantees it) *before* creating the replication slots in
2577 * setup_publisher().
2578 */
2579 pg_log_info("stopping the subscriber");
2581
2582 /* Create the required objects for each database on publisher */
2583 consistent_lsn = setup_publisher(dbinfos.dbinfo);
2584
2585 /* Write the required recovery parameters */
2586 setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2587
2588 /*
2589 * Start subscriber so the recovery parameters will take effect. Wait
2590 * until accepting connections. We don't want to start logical replication
2591 * during setup.
2592 */
2593 pg_log_info("starting the subscriber");
2594 start_standby_server(&opt, true, true);
2595
2596 /* Waiting the subscriber to be promoted */
2598
2599 /*
2600 * Create the subscription for each database on subscriber. It does not
2601 * enable it immediately because it needs to adjust the replication start
2602 * point to the LSN reported by setup_publisher(). It also cleans up
2603 * publications created by this tool and replication to the standby.
2604 */
2605 setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2606
2607 /* Remove primary_slot_name if it exists on primary */
2609
2610 /* Remove failover replication slots if they exist on subscriber */
2612
2613 /* Stop the subscriber */
2614 pg_log_info("stopping the subscriber");
2616
2617 /* Change system identifier from subscriber */
2619
2620 success = true;
2621
2622 pg_log_info("Done!");
2623
2624 return 0;
2625}
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1199
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:430
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:176
@ PG_LOG_WARNING
Definition: logging.h:38
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static char * pg_ctl_path
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static char * pg_resetwal_path
static void usage(void)
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:32
void canonicalize_path(char *path)
Definition: path.c:337
const char * get_progname(const char *argv0)
Definition: path.c:652
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
struct SimpleStringList SimpleStringList
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42

References CreateSubscriberOptions::all_dbs, canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, LogicalRepInfos::dbinfo, dbinfos, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_publisher_databases(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), getopt_long(), SimpleStringList::head, MAXPGPATH, modify_subscriber_sysid(), SimpleStringListCell::next, no_argument, num_dbs, num_pubs, num_replslots, num_subs, OBJECTTYPE_PUBLICATIONS, CreateSubscriberOptions::objecttypes_to_clean, LogicalRepInfos::objecttypes_to_clean, optarg, optind, pg_ctl_path, pg_fatal, pg_log_error, pg_log_error_detail, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_resetwal_path, pg_strcasecmp(), pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, LogicalRepInfo::subconninfo, subscriber_dir, success, CreateSubscriberOptions::two_phase, LogicalRepInfos::two_phase, usage(), and wait_for_end_recovery().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 699 of file pg_createsubscriber.c.

700{
701 ControlFileData *cf;
702 bool crc_ok;
703 struct timeval tv;
704
705 char *cmd_str;
706
707 pg_log_info("modifying system identifier of subscriber");
708
709 cf = get_controlfile(subscriber_dir, &crc_ok);
710 if (!crc_ok)
711 pg_fatal("control file appears to be corrupt");
712
713 /*
714 * Select a new system identifier.
715 *
716 * XXX this code was extracted from BootStrapXLOG().
717 */
718 gettimeofday(&tv, NULL);
719 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
720 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
721 cf->system_identifier |= getpid() & 0xFFF;
722
723 if (dry_run)
724 pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
726 else
727 {
729 pg_log_info("system identifier is %" PRIu64 " on subscriber",
731 }
732
733 if (dry_run)
734 pg_log_info("dry-run: would run pg_resetwal on the subscriber");
735 else
736 pg_log_info("running pg_resetwal on the subscriber");
737
738 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
740
741 pg_log_debug("pg_resetwal command is: %s", cmd_str);
742
743 if (!dry_run)
744 {
745 int rc = system(cmd_str);
746
747 if (rc == 0)
748 pg_log_info("successfully reset WAL on the subscriber");
749 else
750 pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
751 }
752
753 pg_free(cf);
754}
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define DEVNULL
Definition: port.h:161
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
int gettimeofday(struct timeval *tp, void *tzp)

References DEVNULL, dry_run, get_controlfile(), gettimeofday(), pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), subscriber_dir, ControlFileData::system_identifier, update_controlfile(), and wait_result_to_str().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char *  pg_ctl_cmd,
int  rc 
)
static

Definition at line 1584 of file pg_createsubscriber.c.

1585{
1586 if (rc != 0)
1587 {
1588 if (WIFEXITED(rc))
1589 {
1590 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1591 }
1592 else if (WIFSIGNALED(rc))
1593 {
1594#if defined(WIN32)
1595 pg_log_error("pg_ctl was terminated by exception 0x%X",
1596 WTERMSIG(rc));
1597 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1598#else
1599 pg_log_error("pg_ctl was terminated by signal %d: %s",
1600 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1601#endif
1602 }
1603 else
1604 {
1605 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1606 }
1607
1608 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1609 exit(1);
1610 }
1611}
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
#define WIFEXITED(w)
Definition: win32_port.h:150
#define WIFSIGNALED(w)
Definition: win32_port.h:151
#define WTERMSIG(w)
Definition: win32_port.h:153
#define WEXITSTATUS(w)
Definition: win32_port.h:152

References pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 932 of file pg_createsubscriber.c.

933{
934 PGresult *res;
935 int ret;
936
937 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
938
940 {
941 pg_log_error("could not obtain recovery progress: %s",
944 }
945
946
947 ret = strcmp("t", PQgetvalue(res, 0, 0));
948
949 PQclear(res);
950
951 return ret == 0;
952}

References conn, disconnect_database(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, and PQresultStatus.

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char *  lsn 
)
static

Definition at line 1990 of file pg_createsubscriber.c.

1991{
1993 PGresult *res;
1994 Oid suboid;
1995 char *subname;
1996 char *dbname;
1997 char *originname;
1998 char *lsnstr;
1999
2000 Assert(conn != NULL);
2001
2002 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
2003 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
2004
2006 "SELECT s.oid FROM pg_catalog.pg_subscription s "
2007 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
2008 "WHERE s.subname = %s AND d.datname = %s",
2009 subname, dbname);
2010
2011 res = PQexec(conn, str->data);
2012 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2013 {
2014 pg_log_error("could not obtain subscription OID: %s",
2017 }
2018
2019 if (PQntuples(res) != 1 && !dry_run)
2020 {
2021 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
2022 PQntuples(res), 1);
2024 }
2025
2026 if (dry_run)
2027 {
2028 suboid = InvalidOid;
2029 lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
2030 }
2031 else
2032 {
2033 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
2034 lsnstr = psprintf("%s", lsn);
2035 }
2036
2037 PQclear(res);
2038
2039 /*
2040 * The origin name is defined as pg_%u. %u is the subscription OID. See
2041 * ApplyWorkerMain().
2042 */
2043 originname = psprintf("pg_%u", suboid);
2044
2045 if (dry_run)
2046 pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2047 originname, lsnstr, dbinfo->dbname);
2048 else
2049 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
2050 originname, lsnstr, dbinfo->dbname);
2051
2054 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
2055 originname, lsnstr);
2056
2057 pg_log_debug("command is: %s", str->data);
2058
2059 if (!dry_run)
2060 {
2061 res = PQexec(conn, str->data);
2062 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2063 {
2064 pg_log_error("could not set replication progress for subscription \"%s\": %s",
2065 dbinfo->subname, PQresultErrorMessage(res));
2067 }
2068 PQclear(res);
2069 }
2070
2073 pg_free(originname);
2074 pg_free(lsnstr);
2076}
#define InvalidOid
Definition: postgres_ext.h:37
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, psprintf(), resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 844 of file pg_createsubscriber.c.

845{
846 char *lsn = NULL;
847
848 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
849
850 for (int i = 0; i < num_dbs; i++)
851 {
852 PGconn *conn;
853 char *genname = NULL;
854
855 conn = connect_database(dbinfo[i].pubconninfo, true);
856
857 /*
858 * If an object name was not specified as command-line options, assign
859 * a generated object name. The replication slot has a different rule.
860 * The subscription name is assigned to the replication slot name if
861 * no replication slot is specified. It follows the same rule as
862 * CREATE SUBSCRIPTION.
863 */
864 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
865 genname = generate_object_name(conn);
866 if (num_pubs == 0)
867 dbinfo[i].pubname = pg_strdup(genname);
868 if (num_subs == 0)
869 dbinfo[i].subname = pg_strdup(genname);
870 if (num_replslots == 0)
871 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
872
873 if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
874 {
875 /* Reuse existing publication on publisher. */
876 pg_log_info("use existing publication \"%s\" in database \"%s\"",
877 dbinfo[i].pubname, dbinfo[i].dbname);
878 /* Don't remove pre-existing publication if an error occurs. */
879 dbinfo[i].made_publication = false;
880 }
881 else
882 {
883 /*
884 * Create publication on publisher. This step should be executed
885 * *before* promoting the subscriber to avoid any transactions
886 * between consistent LSN and the new publication rows (such
887 * transactions wouldn't see the new publication rows resulting in
888 * an error).
889 */
890 create_publication(conn, &dbinfo[i]);
891 }
892
893 /* Create replication slot on publisher */
894 if (lsn)
895 pg_free(lsn);
896 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
897 if (lsn == NULL && !dry_run)
898 exit(1);
899
900 /*
901 * Since we are using the LSN returned by the last replication slot as
902 * recovery_target_lsn, this LSN is ahead of the current WAL position
903 * and the recovery waits until the publisher writes a WAL record to
904 * reach the target and ends the recovery. On idle systems, this wait
905 * time is unpredictable and could lead to failure in promoting the
906 * subscriber. To avoid that, insert a harmless WAL record.
907 */
908 if (i == num_dbs - 1 && !dry_run)
909 {
910 PGresult *res;
911
912 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
914 {
915 pg_log_error("could not write an additional WAL record: %s",
918 }
919 PQclear(res);
920 }
921
923 }
924
925 return lsn;
926}
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), dbname, disconnect_database(), dry_run, find_publication(), generate_object_name(), i, LogicalRepInfo::made_publication, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_error, pg_log_info, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char *  datadir,
const char *  lsn 
)
static

Definition at line 1320 of file pg_createsubscriber.c.

1321{
1322 PGconn *conn;
1324
1325 /*
1326 * Despite of the recovery parameters will be written to the subscriber,
1327 * use a publisher connection. The primary_conninfo is generated using the
1328 * connection settings.
1329 */
1330 conn = connect_database(dbinfo[0].pubconninfo, true);
1331
1332 /*
1333 * Write recovery parameters.
1334 *
1335 * The subscriber is not running yet. In dry run mode, the recovery
1336 * parameters *won't* be written. An invalid LSN is used for printing
1337 * purposes. Additional recovery parameters are added here. It avoids
1338 * unexpected behavior such as end of recovery as soon as a consistent
1339 * state is reached (recovery_target) and failure due to multiple recovery
1340 * targets (name, time, xid, LSN).
1341 */
1343 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1345 "recovery_target_timeline = 'latest'\n");
1346
1347 /*
1348 * Set recovery_target_inclusive = false to avoid reapplying the
1349 * transaction committed at 'lsn' after subscription is enabled. This is
1350 * because the provided 'lsn' is also used as the replication start point
1351 * for the subscription. So, the server can send the transaction committed
1352 * at that 'lsn' after replication is started which can lead to applying
1353 * the same transaction twice if we keep recovery_target_inclusive = true.
1354 */
1356 "recovery_target_inclusive = false\n");
1358 "recovery_target_action = promote\n");
1359 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1360 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1361 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1362
1363 if (dry_run)
1364 {
1365 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1367 "recovery_target_lsn = '%X/%08X'\n",
1369 }
1370 else
1371 {
1372 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1373 lsn);
1374 }
1375
1376 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1377
1378 if (!dry_run)
1379 {
1380 char conf_filename[MAXPGPATH];
1381 FILE *fd;
1382
1383 /* Write the recovery parameters to INCLUDED_CONF_FILE */
1384 snprintf(conf_filename, MAXPGPATH, "%s/%s", datadir,
1386 fd = fopen(conf_filename, "w");
1387 if (fd == NULL)
1388 pg_fatal("could not open file \"%s\": %m", conf_filename);
1389
1390 if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
1391 pg_fatal("could not write to file \"%s\": %m", conf_filename);
1392
1393 fclose(fd);
1394 recovery_params_set = true;
1395
1396 /* Include conditionally the recovery parameters. */
1399 "include_if_exists '" INCLUDED_CONF_FILE "'\n");
1401 }
1402
1403 disconnect_database(conn, false);
1404}
static PQExpBuffer recoveryconfcontents
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:125
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:28

References appendPQExpBuffer(), appendPQExpBufferStr(), conn, connect_database(), PQExpBufferData::data, datadir, disconnect_database(), dry_run, fd(), GenerateRecoveryConfig(), INCLUDED_CONF_FILE, InvalidXLogRecPtr, PQExpBufferData::len, LSN_FORMAT_ARGS, MAXPGPATH, pg_fatal, pg_log_debug, recovery_params_set, recoveryconfcontents, resetPQExpBuffer(), snprintf, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char *  consistent_lsn 
)
static

Definition at line 1284 of file pg_createsubscriber.c.

1285{
1286 for (int i = 0; i < num_dbs; i++)
1287 {
1288 PGconn *conn;
1289
1290 /* Connect to subscriber. */
1291 conn = connect_database(dbinfo[i].subconninfo, true);
1292
1293 /*
1294 * We don't need the pre-existing subscriptions on the newly formed
1295 * subscriber. They can connect to other publisher nodes and either
1296 * get some unwarranted data or can lead to ERRORs in connecting to
1297 * such nodes.
1298 */
1300
1301 /* Check and drop the required publications in the given database. */
1303
1304 create_subscription(conn, &dbinfo[i]);
1305
1306 /* Set the replication progress to the correct LSN */
1307 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1308
1309 /* Enable subscription */
1310 enable_subscription(conn, &dbinfo[i]);
1311
1312 disconnect_database(conn, false);
1313 }
1314}
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), check_and_drop_publications(), conn, connect_database(), create_subscription(), disconnect_database(), enable_subscription(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access,
bool  restrict_logical_worker 
)
static

Definition at line 1614 of file pg_createsubscriber.c.

1616{
1617 PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1618 int rc;
1619
1620 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1621 appendShellString(pg_ctl_cmd, subscriber_dir);
1622 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1623
1624 /* Prevent unintended slot invalidation */
1625 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1626
1627 if (restricted_access)
1628 {
1629 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1630#if !defined(WIN32)
1631
1632 /*
1633 * An empty listen_addresses list means the server does not listen on
1634 * any IP interfaces; only Unix-domain sockets can be used to connect
1635 * to the server. Prevent external connections to minimize the chance
1636 * of failure.
1637 */
1638 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1639 if (opt->socket_dir)
1640 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1641 opt->socket_dir);
1642 appendPQExpBufferChar(pg_ctl_cmd, '"');
1643#endif
1644 }
1645 if (opt->config_file != NULL)
1646 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1647 opt->config_file);
1648
1649 /* Suppress to start logical replication if requested */
1650 if (restrict_logical_worker)
1651 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1652
1653 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1654 rc = system(pg_ctl_cmd->data);
1655 pg_ctl_status(pg_ctl_cmd->data, rc);
1656 standby_running = true;
1657 destroyPQExpBuffer(pg_ctl_cmd);
1658 pg_log_info("server was started");
1659}
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:582

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char *  datadir)
static

Definition at line 1662 of file pg_createsubscriber.c.

1663{
1664 char *pg_ctl_cmd;
1665 int rc;
1666
1667 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1668 datadir);
1669 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1670 rc = system(pg_ctl_cmd);
1671 pg_ctl_status(pg_ctl_cmd, rc);
1672 standby_running = false;
1673 pg_log_info("server was stopped");
1674}

References datadir, pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, psprintf(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char *  pub_base_conninfo,
const char *  sub_base_conninfo 
)
static

Definition at line 504 of file pg_createsubscriber.c.

507{
508 struct LogicalRepInfo *dbinfo;
509 SimpleStringListCell *pubcell = NULL;
510 SimpleStringListCell *subcell = NULL;
511 SimpleStringListCell *replslotcell = NULL;
512 int i = 0;
513
514 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
515
516 if (num_pubs > 0)
517 pubcell = opt->pub_names.head;
518 if (num_subs > 0)
519 subcell = opt->sub_names.head;
520 if (num_replslots > 0)
521 replslotcell = opt->replslot_names.head;
522
523 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
524 {
525 char *conninfo;
526
527 /* Fill publisher attributes */
528 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
529 dbinfo[i].pubconninfo = conninfo;
530 dbinfo[i].dbname = cell->val;
531 if (num_pubs > 0)
532 dbinfo[i].pubname = pubcell->val;
533 else
534 dbinfo[i].pubname = NULL;
535 if (num_replslots > 0)
536 dbinfo[i].replslotname = replslotcell->val;
537 else
538 dbinfo[i].replslotname = NULL;
539 dbinfo[i].made_replslot = false;
540 dbinfo[i].made_publication = false;
541 /* Fill subscriber attributes */
542 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
543 dbinfo[i].subconninfo = conninfo;
544 if (num_subs > 0)
545 dbinfo[i].subname = subcell->val;
546 else
547 dbinfo[i].subname = NULL;
548 /* Other fields will be filled later */
549
550 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
551 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
552 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
553 dbinfo[i].pubconninfo);
554 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
555 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
556 dbinfo[i].subconninfo,
557 dbinfos.two_phase ? "true" : "false");
558
559 if (num_pubs > 0)
560 pubcell = pubcell->next;
561 if (num_subs > 0)
562 subcell = subcell->next;
563 if (num_replslots > 0)
564 replslotcell = replslotcell->next;
565
566 i++;
567 }
568
569 return dbinfo;
570}
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:56
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfos, LogicalRepInfo::dbname, SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, SimpleStringListCell::next, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, LogicalRepInfos::two_phase, and SimpleStringListCell::val.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 275 of file pg_createsubscriber.c.

276{
277 printf(_("%s creates a new logical replica from a standby server.\n\n"),
278 progname);
279 printf(_("Usage:\n"));
280 printf(_(" %s [OPTION]...\n"), progname);
281 printf(_("\nOptions:\n"));
282 printf(_(" -a, --all create subscriptions for all databases except template\n"
283 " databases and databases that don't allow connections\n"));
284 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
285 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
286 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
287 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
288 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
289 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
290 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
291 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
292 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
293 printf(_(" -v, --verbose output verbose messages\n"));
294 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
295 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
296 printf(_(" --config-file=FILENAME use specified main server configuration\n"
297 " file when running target cluster\n"));
298 printf(_(" --publication=NAME publication name\n"));
299 printf(_(" --replication-slot=NAME replication slot name\n"));
300 printf(_(" --subscription=NAME subscription name\n"));
301 printf(_(" -V, --version output version information, then exit\n"));
302 printf(_(" -?, --help show this help, then exit\n"));
303 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
304 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
305}
#define _(x)
Definition: elog.c:91
#define printf(...)
Definition: port.h:266

References _, DEFAULT_SUB_PORT, printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char *  conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1686 of file pg_createsubscriber.c.

1687{
1688 PGconn *conn;
1689 bool ready = false;
1690 int timer = 0;
1691
1692 pg_log_info("waiting for the target server to reach the consistent state");
1693
1694 conn = connect_database(conninfo, true);
1695
1696 for (;;)
1697 {
1698 /* Did the recovery process finish? We're done if so. */
1700 {
1701 ready = true;
1702 recovery_ended = true;
1703 break;
1704 }
1705
1706 /* Bail out after recovery_timeout seconds if this option is set */
1707 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1708 {
1710 pg_log_error("recovery timed out");
1712 }
1713
1714 /* Keep waiting */
1716 timer += WAIT_INTERVAL;
1717 }
1718
1719 disconnect_database(conn, false);
1720
1721 if (!ready)
1722 pg_fatal("server did not end recovery");
1723
1724 pg_log_info("target server reached the consistent state");
1725 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1726}
#define USECS_PER_SEC
Definition: timestamp.h:134
#define pg_log_info_hint(...)
Definition: logging.h:130
#define WAIT_INTERVAL
void pg_usleep(long microsec)
Definition: signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), recovery_ended, CreateSubscriberOptions::recovery_timeout, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USECS_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfos

◆ dry_run

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 161 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 163 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 162 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 167 of file pg_createsubscriber.c.

Referenced by main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 168 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

Definition at line 154 of file pg_createsubscriber.c.

Referenced by check_subscriber(), drop_primary_replication_slot(), and main().

◆ prng_state

pg_prng_state prng_state
static

Definition at line 165 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 152 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 173 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ recovery_params_set

bool recovery_params_set = false
static

Definition at line 175 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and setup_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

char* subscriber_dir = NULL
static

◆ success

bool success = false
static

Definition at line 157 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().