PostgreSQL Source Code git master
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/wait.h>
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "datatype/timestamp.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "fe_utils/version.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 
struct  LogicalRepInfos
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define OBJECTTYPE_PUBLICATIONS   0x0001
 
#define WAIT_INTERVAL   1 /* 1 second */
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage (void)
 
static char * get_base_conninfo (const char *conninfo, char **dbname)
 
static char * get_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static char * get_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static char * concat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static char * generate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static char * setup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
 
static char * create_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_publication (PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
 
static void check_and_drop_publications (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void drop_existing_subscription (PGconn *conn, const char *subname, const char *dbname)
 
static void get_publisher_databases (struct CreateSubscriberOptions *opt, bool dbnamespecified)
 
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
 
int main (int argc, char **argv)
 

Variables

static const char * progname
 
static char * primary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfos dbinfos
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static char * pg_ctl_path = NULL
 
static char * pg_resetwal_path = NULL
 
static char * subscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 33 of file pg_createsubscriber.c.

◆ OBJECTTYPE_PUBLICATIONS

#define OBJECTTYPE_PUBLICATIONS   0x0001

Definition at line 34 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 133 of file pg_createsubscriber.c.

Function Documentation

◆ appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer  buf,
const char *  keyword,
const char *  val 
)
static

Definition at line 274 of file pg_createsubscriber.c.

275{
276 if (buf->len > 0)
278 appendPQExpBufferStr(buf, keyword);
281}
long val
Definition: informix.c:689
static char * buf
Definition: pg_test_fsync.c:72
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:698

References appendConnStrVal(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and val.

Referenced by concat_conninfo_dbname(), get_base_conninfo(), and get_sub_conninfo().

◆ check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1161 of file pg_createsubscriber.c.

1163{
1165 char *dbname;
1166 PGresult *res;
1167
1168 Assert(conn != NULL);
1169
1170 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1171
1172 appendPQExpBuffer(query,
1173 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1174 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1175 "WHERE d.datname = %s",
1176 dbname);
1177 res = PQexec(conn, query->data);
1178
1179 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1180 {
1181 pg_log_error("could not obtain pre-existing subscriptions: %s",
1184 }
1185
1186 for (int i = 0; i < PQntuples(res); i++)
1188 dbinfo->dbname);
1189
1190 PQclear(res);
1191 destroyPQExpBuffer(query);
1193}
void PQfreemem(void *ptr)
Definition: fe-exec.c:4049
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4399
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2279
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
#define PQresultErrorMessage
#define PQgetvalue
Definition: libpq-be-fe.h:253
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultStatus
Definition: libpq-be-fe.h:247
#define PQntuples
Definition: libpq-be-fe.h:251
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
#define pg_log_error(...)
Definition: logging.h:106
static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname)
static void disconnect_database(PGconn *conn, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscription(), i, pg_log_error, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by setup_subscriber().

◆ check_and_drop_publications()

static void check_and_drop_publications ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1759 of file pg_createsubscriber.c.

1760{
1761 PGresult *res;
1763
1764 Assert(conn != NULL);
1765
1766 if (drop_all_pubs)
1767 {
1768 pg_log_info("dropping all existing publications in database \"%s\"",
1769 dbinfo->dbname);
1770
1771 /* Fetch all publication names */
1772 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1773 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1774 {
1775 pg_log_error("could not obtain publication information: %s",
1777 PQclear(res);
1779 }
1780
1781 /* Drop each publication */
1782 for (int i = 0; i < PQntuples(res); i++)
1783 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1784 &dbinfo->made_publication);
1785
1786 PQclear(res);
1787 }
1788
1789 /*
1790 * In dry-run mode, we don't create publications, but we still try to drop
1791 * those to provide necessary information to the user.
1792 */
1793 if (!drop_all_pubs || dry_run)
1794 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1795 &dbinfo->made_publication);
1796}
#define pg_log_info(...)
Definition: logging.h:124
static struct LogicalRepInfos dbinfos
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static bool dry_run
#define OBJECTTYPE_PUBLICATIONS

References Assert(), conn, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), dry_run, i, LogicalRepInfo::made_publication, OBJECTTYPE_PUBLICATIONS, LogicalRepInfos::objecttypes_to_clean, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, and LogicalRepInfo::pubname.

Referenced by setup_subscriber().

◆ check_data_directory()

static void check_data_directory ( const char *  datadir)
static

Definition at line 402 of file pg_createsubscriber.c.

403{
404 struct stat statbuf;
405 uint32 major_version;
406 char *version_str;
407
408 pg_log_info("checking if directory \"%s\" is a cluster data directory",
409 datadir);
410
411 if (stat(datadir, &statbuf) != 0)
412 {
413 if (errno == ENOENT)
414 pg_fatal("data directory \"%s\" does not exist", datadir);
415 else
416 pg_fatal("could not access directory \"%s\": %m", datadir);
417 }
418
419 /*
420 * Retrieve the contents of this cluster's PG_VERSION. We require
421 * compatibility with the same major version as the one this tool is
422 * compiled with.
423 */
424 major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
425 if (major_version != PG_MAJORVERSION_NUM)
426 {
427 pg_log_error("data directory is of wrong version");
428 pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
429 "PG_VERSION", version_str, PG_MAJORVERSION);
430 exit(1);
431 }
432}
uint32_t uint32
Definition: c.h:541
uint32 get_pg_version(const char *datadir, char **version_str)
Definition: version.c:44
#define pg_log_error_detail(...)
Definition: logging.h:109
#define pg_fatal(...)
char * datadir
#define GET_PG_MAJORVERSION_NUM(v)
Definition: version.h:19
#define stat
Definition: win32_port.h:274

References datadir, GET_PG_MAJORVERSION_NUM, get_pg_version(), pg_fatal, pg_log_error, pg_log_error_detail, pg_log_info, and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 877 of file pg_createsubscriber.c.

878{
879 PGconn *conn;
880 PGresult *res;
881 bool failed = false;
882
883 char *wal_level;
884 int max_repslots;
885 int cur_repslots;
886 int max_walsenders;
887 int cur_walsenders;
888 int max_prepared_transactions;
889 char *max_slot_wal_keep_size;
890
891 pg_log_info("checking settings on publisher");
892
893 conn = connect_database(dbinfo[0].pubconninfo, true);
894
895 /*
896 * If the primary server is in recovery (i.e. cascading replication),
897 * objects (publication) cannot be created because it is read only.
898 */
900 {
901 pg_log_error("primary server cannot be in recovery");
903 }
904
905 /*------------------------------------------------------------------------
906 * Logical replication requires a few parameters to be set on publisher.
907 * Since these parameters are not a requirement for physical replication,
908 * we should check it to make sure it won't fail.
909 *
910 * - wal_level = logical
911 * - max_replication_slots >= current + number of dbs to be converted
912 * - max_wal_senders >= current + number of dbs to be converted
913 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
914 * -----------------------------------------------------------------------
915 */
916 res = PQexec(conn,
917 "SELECT pg_catalog.current_setting('wal_level'),"
918 " pg_catalog.current_setting('max_replication_slots'),"
919 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
920 " pg_catalog.current_setting('max_wal_senders'),"
921 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
922 " pg_catalog.current_setting('max_prepared_transactions'),"
923 " pg_catalog.current_setting('max_slot_wal_keep_size')");
924
926 {
927 pg_log_error("could not obtain publisher settings: %s",
930 }
931
932 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
933 max_repslots = atoi(PQgetvalue(res, 0, 1));
934 cur_repslots = atoi(PQgetvalue(res, 0, 2));
935 max_walsenders = atoi(PQgetvalue(res, 0, 3));
936 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
937 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
938 max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
939
940 PQclear(res);
941
942 pg_log_debug("publisher: wal_level: %s", wal_level);
943 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
944 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
945 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
946 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
947 pg_log_debug("publisher: max_prepared_transactions: %d",
948 max_prepared_transactions);
949 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
950 max_slot_wal_keep_size);
951
953
954 if (strcmp(wal_level, "logical") != 0)
955 {
956 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
957 failed = true;
958 }
959
960 if (max_repslots - cur_repslots < num_dbs)
961 {
962 pg_log_error("publisher requires %d replication slots, but only %d remain",
963 num_dbs, max_repslots - cur_repslots);
964 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
965 "max_replication_slots", cur_repslots + num_dbs);
966 failed = true;
967 }
968
969 if (max_walsenders - cur_walsenders < num_dbs)
970 {
971 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
972 num_dbs, max_walsenders - cur_walsenders);
973 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
974 "max_wal_senders", cur_walsenders + num_dbs);
975 failed = true;
976 }
977
978 if (max_prepared_transactions != 0 && !dbinfos.two_phase)
979 {
980 pg_log_warning("two_phase option will not be enabled for replication slots");
981 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
982 "Prepared transactions will be replicated at COMMIT PREPARED.");
983 pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
984 }
985
986 /*
987 * In dry-run mode, validate 'max_slot_wal_keep_size'. If this parameter
988 * is set to a non-default value, it may cause replication failures due to
989 * required WAL files being prematurely removed.
990 */
991 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
992 {
993 pg_log_warning("required WAL could be removed from the publisher");
994 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
995 "max_slot_wal_keep_size");
996 }
997
999
1000 if (failed)
1001 exit(1);
1002}
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_debug(...)
Definition: logging.h:133
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
#define pg_log_warning(...)
Definition: pgfnames.c:24
int wal_level
Definition: xlog.c:133

References conn, connect_database(), dbinfos, disconnect_database(), dry_run, num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_log_warning, pg_log_warning_detail, pg_log_warning_hint, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, server_is_in_recovery(), LogicalRepInfos::two_phase, and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1016 of file pg_createsubscriber.c.

1017{
1018 PGconn *conn;
1019 PGresult *res;
1020 bool failed = false;
1021
1022 int max_lrworkers;
1023 int max_reporigins;
1024 int max_wprocs;
1025
1026 pg_log_info("checking settings on subscriber");
1027
1028 conn = connect_database(dbinfo[0].subconninfo, true);
1029
1030 /* The target server must be a standby */
1032 {
1033 pg_log_error("target server must be a standby");
1035 }
1036
1037 /*------------------------------------------------------------------------
1038 * Logical replication requires a few parameters to be set on subscriber.
1039 * Since these parameters are not a requirement for physical replication,
1040 * we should check it to make sure it won't fail.
1041 *
1042 * - max_active_replication_origins >= number of dbs to be converted
1043 * - max_logical_replication_workers >= number of dbs to be converted
1044 * - max_worker_processes >= 1 + number of dbs to be converted
1045 *------------------------------------------------------------------------
1046 */
1047 res = PQexec(conn,
1048 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1049 "'max_logical_replication_workers', "
1050 "'max_active_replication_origins', "
1051 "'max_worker_processes', "
1052 "'primary_slot_name') "
1053 "ORDER BY name");
1054
1055 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1056 {
1057 pg_log_error("could not obtain subscriber settings: %s",
1060 }
1061
1062 max_reporigins = atoi(PQgetvalue(res, 0, 0));
1063 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1064 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1065 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1067
1068 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1069 max_lrworkers);
1070 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1071 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1073 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1074
1075 PQclear(res);
1076
1077 disconnect_database(conn, false);
1078
1079 if (max_reporigins < num_dbs)
1080 {
1081 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1082 num_dbs, max_reporigins);
1083 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1084 "max_active_replication_origins", num_dbs);
1085 failed = true;
1086 }
1087
1088 if (max_lrworkers < num_dbs)
1089 {
1090 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1091 num_dbs, max_lrworkers);
1092 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1093 "max_logical_replication_workers", num_dbs);
1094 failed = true;
1095 }
1096
1097 if (max_wprocs < num_dbs + 1)
1098 {
1099 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1100 num_dbs + 1, max_wprocs);
1101 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1102 "max_worker_processes", num_dbs + 1);
1103 failed = true;
1104 }
1105
1106 if (failed)
1107 exit(1);
1108}
static char * primary_slot_name

References conn, connect_database(), disconnect_database(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, PQresultStatus, primary_slot_name, and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 172 of file pg_createsubscriber.c.

173{
174 if (success)
175 return;
176
177 /*
178 * If the server is promoted, there is no way to use the current setup
179 * again. Warn the user that a new replication setup should be done before
180 * trying again.
181 */
182 if (recovery_ended)
183 {
184 pg_log_warning("failed after the end of recovery");
185 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
186 "You must recreate the physical replica before continuing.");
187 }
188
189 for (int i = 0; i < num_dbs; i++)
190 {
191 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
192
193 if (dbinfo->made_publication || dbinfo->made_replslot)
194 {
195 PGconn *conn;
196
197 conn = connect_database(dbinfo->pubconninfo, false);
198 if (conn != NULL)
199 {
200 if (dbinfo->made_publication)
201 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
202 &dbinfo->made_publication);
203 if (dbinfo->made_replslot)
204 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
206 }
207 else
208 {
209 /*
210 * If a connection could not be established, inform the user
211 * that some objects were left on primary and should be
212 * removed before trying again.
213 */
214 if (dbinfo->made_publication)
215 {
216 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
217 dbinfo->pubname,
218 dbinfo->dbname);
219 pg_log_warning_hint("Drop this publication before trying again.");
220 }
221 if (dbinfo->made_replslot)
222 {
223 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
224 dbinfo->replslotname,
225 dbinfo->dbname);
226 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
227 }
228 }
229 }
230 }
231
232 if (standby_running)
234}
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
static bool success
static bool recovery_ended
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
struct LogicalRepInfo * dbinfo

References conn, connect_database(), LogicalRepInfos::dbinfo, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), drop_replication_slot(), i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, LogicalRepInfo::replslotname, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char *  conninfo,
const char *  dbname 
)
static

Definition at line 442 of file pg_createsubscriber.c.

443{
445 char *ret;
446
447 Assert(conninfo != NULL);
448
449 appendPQExpBufferStr(buf, conninfo);
450 appendConnStrItem(buf, "dbname", dbname);
451
452 ret = pg_strdup(buf->data);
454
455 return ret;
456}
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), and pg_strdup().

Referenced by get_publisher_databases(), and store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char *  conninfo,
bool  exit_on_error 
)
static

Definition at line 539 of file pg_createsubscriber.c.

540{
541 PGconn *conn;
542 PGresult *res;
543
544 conn = PQconnectdb(conninfo);
546 {
547 pg_log_error("connection to database failed: %s",
549 PQfinish(conn);
550
551 if (exit_on_error)
552 exit(1);
553 return NULL;
554 }
555
556 /* Secure search_path */
559 {
560 pg_log_error("could not clear \"search_path\": %s",
562 PQclear(res);
563 PQfinish(conn);
564
565 if (exit_on_error)
566 exit(1);
567 return NULL;
568 }
569 PQclear(res);
570
571 return conn;
572}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:825
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7641
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5316
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
@ CONNECTION_OK
Definition: libpq-fe.h:84

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, pg_log_error, PGRES_TUPLES_OK, PQclear, PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage, PQresultStatus, and PQstatus().

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), get_publisher_databases(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1378 of file pg_createsubscriber.c.

1379{
1381 PGresult *res = NULL;
1382 const char *slot_name = dbinfo->replslotname;
1383 char *slot_name_esc;
1384 char *lsn = NULL;
1385
1386 Assert(conn != NULL);
1387
1388 if (dry_run)
1389 pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
1390 slot_name, dbinfo->dbname);
1391 else
1392 pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
1393 slot_name, dbinfo->dbname);
1394
1395 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1396
1398 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1399 slot_name_esc,
1400 dbinfos.two_phase ? "true" : "false");
1401
1402 PQfreemem(slot_name_esc);
1403
1404 pg_log_debug("command is: %s", str->data);
1405
1406 if (!dry_run)
1407 {
1408 res = PQexec(conn, str->data);
1409 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1410 {
1411 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1412 slot_name, dbinfo->dbname,
1414 PQclear(res);
1416 return NULL;
1417 }
1418
1419 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1420 PQclear(res);
1421 }
1422
1423 /* For cleanup purposes */
1424 dbinfo->made_replslot = true;
1425
1427
1428 return lsn;
1429}
const char * str

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::replslotname, str, and LogicalRepInfos::two_phase.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1624 of file pg_createsubscriber.c.

1625{
1627 PGresult *res;
1628 char *ipubname_esc;
1629 char *spubname_esc;
1630
1631 Assert(conn != NULL);
1632
1633 ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1634 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1635
1636 /* Check if the publication already exists */
1638 "SELECT 1 FROM pg_catalog.pg_publication "
1639 "WHERE pubname = %s",
1640 spubname_esc);
1641 res = PQexec(conn, str->data);
1642 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1643 {
1644 pg_log_error("could not obtain publication information: %s",
1647 }
1648
1649 if (PQntuples(res) == 1)
1650 {
1651 /*
1652 * Unfortunately, if it reaches this code path, it will always fail
1653 * (unless you decide to change the existing publication name). That's
1654 * bad but it is very unlikely that the user will choose a name with
1655 * pg_createsubscriber_ prefix followed by the exact database oid and
1656 * a random number.
1657 */
1658 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1659 pg_log_error_hint("Consider renaming this publication before continuing.");
1661 }
1662
1663 PQclear(res);
1665
1666 if (dry_run)
1667 pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
1668 dbinfo->pubname, dbinfo->dbname);
1669 else
1670 pg_log_info("creating publication \"%s\" in database \"%s\"",
1671 dbinfo->pubname, dbinfo->dbname);
1672
1673 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1674 ipubname_esc);
1675
1676 pg_log_debug("command is: %s", str->data);
1677
1678 if (!dry_run)
1679 {
1680 res = PQexec(conn, str->data);
1681 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1682 {
1683 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1684 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1686 }
1687 PQclear(res);
1688 }
1689
1690 /* For cleanup purposes */
1691 dbinfo->made_publication = true;
1692
1693 PQfreemem(ipubname_esc);
1694 PQfreemem(spubname_esc);
1696}
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4405
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, LogicalRepInfo::made_publication, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples, PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubname, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1810 of file pg_createsubscriber.c.

1811{
1813 PGresult *res;
1814 char *pubname_esc;
1815 char *subname_esc;
1816 char *pubconninfo_esc;
1817 char *replslotname_esc;
1818
1819 Assert(conn != NULL);
1820
1821 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1822 subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1823 pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1824 replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1825
1826 if (dry_run)
1827 pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
1828 dbinfo->subname, dbinfo->dbname);
1829 else
1830 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1831 dbinfo->subname, dbinfo->dbname);
1832
1834 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1835 "WITH (create_slot = false, enabled = false, "
1836 "slot_name = %s, copy_data = false, two_phase = %s)",
1837 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1838 dbinfos.two_phase ? "true" : "false");
1839
1840 PQfreemem(pubname_esc);
1841 PQfreemem(subname_esc);
1842 PQfreemem(pubconninfo_esc);
1843 PQfreemem(replslotname_esc);
1844
1845 pg_log_debug("command is: %s", str->data);
1846
1847 if (!dry_run)
1848 {
1849 res = PQexec(conn, str->data);
1850 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1851 {
1852 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1853 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1855 }
1856 PQclear(res);
1857 }
1858
1860}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, str, LogicalRepInfo::subname, and LogicalRepInfos::two_phase.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_existing_subscription()

static void drop_existing_subscription ( PGconn conn,
const char *  subname,
const char *  dbname 
)
static

Definition at line 1117 of file pg_createsubscriber.c.

1118{
1120 PGresult *res;
1121
1122 Assert(conn != NULL);
1123
1124 /*
1125 * Construct a query string. These commands are allowed to be executed
1126 * within a transaction.
1127 */
1128 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1129 subname);
1130 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1131 subname);
1132 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1133
1134 if (dry_run)
1135 pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
1136 subname, dbname);
1137 else
1138 {
1139 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1140 subname, dbname);
1141
1142 res = PQexec(conn, query->data);
1143
1144 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1145 {
1146 pg_log_error("could not drop subscription \"%s\": %s",
1149 }
1150
1151 PQclear(res);
1152 }
1153
1154 destroyPQExpBuffer(query);
1155}
NameData subname

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, and subname.

Referenced by check_and_drop_existing_subscriptions().

◆ drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo dbinfo)
static

Definition at line 1336 of file pg_createsubscriber.c.

1337{
1338 PGconn *conn;
1339 PGresult *res;
1340
1341 conn = connect_database(dbinfo[0].subconninfo, false);
1342 if (conn != NULL)
1343 {
1344 /* Get failover replication slot names */
1345 res = PQexec(conn,
1346 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1347
1348 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1349 {
1350 /* Remove failover replication slots from subscriber */
1351 for (int i = 0; i < PQntuples(res); i++)
1352 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1353 }
1354 else
1355 {
1356 pg_log_warning("could not obtain failover replication slot information: %s",
1358 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1359 }
1360
1361 PQclear(res);
1362 disconnect_database(conn, false);
1363 }
1364 else
1365 {
1366 pg_log_warning("could not drop failover replication slot");
1367 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1368 }
1369}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), i, pg_log_warning, pg_log_warning_hint, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char *  slotname 
)
static

Definition at line 1306 of file pg_createsubscriber.c.

1307{
1308 PGconn *conn;
1309
1310 /* Replication slot does not exist, do nothing */
1311 if (!primary_slot_name)
1312 return;
1313
1314 conn = connect_database(dbinfo[0].pubconninfo, false);
1315 if (conn != NULL)
1316 {
1317 drop_replication_slot(conn, &dbinfo[0], slotname);
1318 disconnect_database(conn, false);
1319 }
1320 else
1321 {
1322 pg_log_warning("could not drop replication slot \"%s\" on primary",
1323 slotname);
1324 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1325 }
1326}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), pg_log_warning, pg_log_warning_hint, and primary_slot_name.

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
const char *  pubname,
const char *  dbname,
bool *  made_publication 
)
static

Definition at line 1702 of file pg_createsubscriber.c.

1704{
1706 PGresult *res;
1707 char *pubname_esc;
1708
1709 Assert(conn != NULL);
1710
1711 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1712
1713 if (dry_run)
1714 pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
1715 pubname, dbname);
1716 else
1717 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1718 pubname, dbname);
1719
1720 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1721
1722 PQfreemem(pubname_esc);
1723
1724 pg_log_debug("command is: %s", str->data);
1725
1726 if (!dry_run)
1727 {
1728 res = PQexec(conn, str->data);
1729 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1730 {
1731 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1732 pubname, dbname, PQresultErrorMessage(res));
1733 *made_publication = false; /* don't try again. */
1734
1735 /*
1736 * Don't disconnect and exit here. This routine is used by primary
1737 * (cleanup publication / replication slot due to an error) and
1738 * subscriber (remove the replicated publications). In both cases,
1739 * it can continue and provide instructions for the user to remove
1740 * it later if cleanup fails.
1741 */
1742 }
1743 PQclear(res);
1744 }
1745
1747}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by check_and_drop_publications(), and cleanup_objects_atexit().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char *  slot_name 
)
static

Definition at line 1432 of file pg_createsubscriber.c.

1434{
1436 char *slot_name_esc;
1437 PGresult *res;
1438
1439 Assert(conn != NULL);
1440
1441 if (dry_run)
1442 pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
1443 slot_name, dbinfo->dbname);
1444 else
1445 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1446 slot_name, dbinfo->dbname);
1447
1448 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1449
1450 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1451
1452 PQfreemem(slot_name_esc);
1453
1454 pg_log_debug("command is: %s", str->data);
1455
1456 if (!dry_run)
1457 {
1458 res = PQexec(conn, str->data);
1459 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1460 {
1461 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1462 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1463 dbinfo->made_replslot = false; /* don't try again. */
1464 }
1465
1466 PQclear(res);
1467 }
1468
1470}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1968 of file pg_createsubscriber.c.

1969{
1971 PGresult *res;
1972 char *subname;
1973
1974 Assert(conn != NULL);
1975
1976 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1977
1978 if (dry_run)
1979 pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
1980 dbinfo->subname, dbinfo->dbname);
1981 else
1982 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1983 dbinfo->subname, dbinfo->dbname);
1984
1985 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1986
1987 pg_log_debug("command is: %s", str->data);
1988
1989 if (!dry_run)
1990 {
1991 res = PQexec(conn, str->data);
1992 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1993 {
1994 pg_log_error("could not enable subscription \"%s\": %s",
1995 dbinfo->subname, PQresultErrorMessage(res));
1997 }
1998
1999 PQclear(res);
2000 }
2001
2004}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear, PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage, PQresultStatus, str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 724 of file pg_createsubscriber.c.

725{
726 PGresult *res;
727 Oid oid;
728 uint32 rand;
729 char *objname;
730
731 res = PQexec(conn,
732 "SELECT oid FROM pg_catalog.pg_database "
733 "WHERE datname = pg_catalog.current_database()");
735 {
736 pg_log_error("could not obtain database OID: %s",
739 }
740
741 if (PQntuples(res) != 1)
742 {
743 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
744 PQntuples(res), 1);
746 }
747
748 /* Database OID */
749 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
750
751 PQclear(res);
752
753 /* Random unsigned integer */
754 rand = pg_prng_uint32(&prng_state);
755
756 /*
757 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
758 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
759 * '\0').
760 */
761 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
762
763 return objname;
764}
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
unsigned int Oid
Definition: postgres_ext.h:32
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43

References conn, disconnect_database(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, prng_state, and psprintf().

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char *  conninfo,
char **  dbname 
)
static

Definition at line 296 of file pg_createsubscriber.c.

297{
299 PQconninfoOption *conn_opts;
300 PQconninfoOption *conn_opt;
301 char *errmsg = NULL;
302 char *ret;
303
304 conn_opts = PQconninfoParse(conninfo, &errmsg);
305 if (conn_opts == NULL)
306 {
307 pg_log_error("could not parse connection string: %s", errmsg);
309 return NULL;
310 }
311
313 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
314 {
315 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
316 {
317 if (strcmp(conn_opt->keyword, "dbname") == 0)
318 {
319 if (dbname)
320 *dbname = pg_strdup(conn_opt->val);
321 continue;
322 }
323 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
324 }
325 }
326
327 ret = pg_strdup(buf->data);
328
330 PQconninfoFree(conn_opts);
331
332 return ret;
333}
int errmsg(const char *fmt,...)
Definition: elog.c:1080
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7525
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6241

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg(), _PQconninfoOption::keyword, pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and _PQconninfoOption::val.

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char *  argv0,
const char *  progname 
)
static

Definition at line 366 of file pg_createsubscriber.c.

367{
368 char *versionstr;
369 char *exec_path;
370 int ret;
371
372 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
374 ret = find_other_exec(argv0, progname, versionstr, exec_path);
375
376 if (ret < 0)
377 {
378 char full_path[MAXPGPATH];
379
380 if (find_my_exec(argv0, full_path) < 0)
381 strlcpy(full_path, progname, sizeof(full_path));
382
383 if (ret == -1)
384 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
385 progname, "pg_createsubscriber", full_path);
386 else
387 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
388 progname, full_path, "pg_createsubscriber");
389 }
390
391 pg_log_debug("%s path is: %s", progname, exec_path);
392
393 return exec_path;
394}
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:161
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:311
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define MAXPGPATH
static const char * progname
static char * argv0
Definition: pg_ctl.c:94
static char * exec_path
Definition: pg_ctl.c:89
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45

References argv0, exec_path, find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char *  conninfo)
static

Definition at line 594 of file pg_createsubscriber.c.

595{
596 PGconn *conn;
597 PGresult *res;
598 uint64 sysid;
599
600 pg_log_info("getting system identifier from publisher");
601
602 conn = connect_database(conninfo, true);
603
604 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
606 {
607 pg_log_error("could not get system identifier: %s",
610 }
611 if (PQntuples(res) != 1)
612 {
613 pg_log_error("could not get system identifier: got %d rows, expected %d row",
614 PQntuples(res), 1);
616 }
617
618 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
619
620 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
621
622 PQclear(res);
624
625 return sysid;
626}
uint64_t uint64
Definition: c.h:542

References conn, connect_database(), disconnect_database(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, and PQresultStatus.

Referenced by main().

◆ get_publisher_databases()

static void get_publisher_databases ( struct CreateSubscriberOptions opt,
bool  dbnamespecified 
)
static

Definition at line 2012 of file pg_createsubscriber.c.

2014{
2015 PGconn *conn;
2016 PGresult *res;
2017
2018 /* If a database name was specified, just connect to it. */
2019 if (dbnamespecified)
2021 else
2022 {
2023 /* Otherwise, try postgres first and then template1. */
2024 char *conninfo;
2025
2026 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
2027 conn = connect_database(conninfo, false);
2028 pg_free(conninfo);
2029 if (!conn)
2030 {
2031 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
2032 conn = connect_database(conninfo, true);
2033 pg_free(conninfo);
2034 }
2035 }
2036
2037 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
2038 if (PQresultStatus(res) != PGRES_TUPLES_OK)
2039 {
2040 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2041 PQclear(res);
2043 }
2044
2045 for (int i = 0; i < PQntuples(res); i++)
2046 {
2047 const char *dbname = PQgetvalue(res, i, 0);
2048
2050
2051 /* Increment num_dbs to reflect multiple --database options */
2052 num_dbs++;
2053 }
2054
2055 PQclear(res);
2056 disconnect_database(conn, false);
2057}
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
SimpleStringList database_names

References concat_conninfo_dbname(), conn, connect_database(), CreateSubscriberOptions::database_names, dbname, disconnect_database(), i, num_dbs, pg_free(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, CreateSubscriberOptions::pub_conninfo_str, and simple_string_list_append().

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char *  datadir)
static

Definition at line 634 of file pg_createsubscriber.c.

635{
636 ControlFileData *cf;
637 bool crc_ok;
638 uint64 sysid;
639
640 pg_log_info("getting system identifier from subscriber");
641
642 cf = get_controlfile(datadir, &crc_ok);
643 if (!crc_ok)
644 pg_fatal("control file appears to be corrupt");
645
646 sysid = cf->system_identifier;
647
648 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
649
650 pg_free(cf);
651
652 return sysid;
653}
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
uint64 system_identifier
Definition: pg_control.h:110

References datadir, get_controlfile(), pg_fatal, pg_free(), pg_log_info, and ControlFileData::system_identifier.

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 340 of file pg_createsubscriber.c.

341{
343 char *ret;
344
345 appendConnStrItem(buf, "port", opt->sub_port);
346#if !defined(WIN32)
347 appendConnStrItem(buf, "host", opt->socket_dir);
348#endif
349 if (opt->sub_username != NULL)
350 appendConnStrItem(buf, "user", opt->sub_username);
351 appendConnStrItem(buf, "fallback_application_name", progname);
352
353 ret = pg_strdup(buf->data);
354
356
357 return ret;
358}

References appendConnStrItem(), buf, createPQExpBuffer(), destroyPQExpBuffer(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2060 of file pg_createsubscriber.c.

2061{
2062 static struct option long_options[] =
2063 {
2064 {"all", no_argument, NULL, 'a'},
2065 {"database", required_argument, NULL, 'd'},
2066 {"pgdata", required_argument, NULL, 'D'},
2067 {"dry-run", no_argument, NULL, 'n'},
2068 {"subscriber-port", required_argument, NULL, 'p'},
2069 {"publisher-server", required_argument, NULL, 'P'},
2070 {"socketdir", required_argument, NULL, 's'},
2071 {"recovery-timeout", required_argument, NULL, 't'},
2072 {"enable-two-phase", no_argument, NULL, 'T'},
2073 {"subscriber-username", required_argument, NULL, 'U'},
2074 {"verbose", no_argument, NULL, 'v'},
2075 {"version", no_argument, NULL, 'V'},
2076 {"help", no_argument, NULL, '?'},
2077 {"config-file", required_argument, NULL, 1},
2078 {"publication", required_argument, NULL, 2},
2079 {"replication-slot", required_argument, NULL, 3},
2080 {"subscription", required_argument, NULL, 4},
2081 {"clean", required_argument, NULL, 5},
2082 {NULL, 0, NULL, 0}
2083 };
2084
2085 struct CreateSubscriberOptions opt = {0};
2086
2087 int c;
2088 int option_index;
2089
2090 char *pub_base_conninfo;
2091 char *sub_base_conninfo;
2092 char *dbname_conninfo = NULL;
2093
2094 uint64 pub_sysid;
2095 uint64 sub_sysid;
2096 struct stat statbuf;
2097
2098 char *consistent_lsn;
2099
2100 char pidfile[MAXPGPATH];
2101
2102 pg_logging_init(argv[0]);
2104 progname = get_progname(argv[0]);
2105 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2106
2107 if (argc > 1)
2108 {
2109 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2110 {
2111 usage();
2112 exit(0);
2113 }
2114 else if (strcmp(argv[1], "-V") == 0
2115 || strcmp(argv[1], "--version") == 0)
2116 {
2117 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2118 exit(0);
2119 }
2120 }
2121
2122 /* Default settings */
2123 subscriber_dir = NULL;
2124 opt.config_file = NULL;
2125 opt.pub_conninfo_str = NULL;
2126 opt.socket_dir = NULL;
2128 opt.sub_username = NULL;
2129 opt.two_phase = false;
2131 {
2132 0
2133 };
2134 opt.recovery_timeout = 0;
2135 opt.all_dbs = false;
2136
2137 /*
2138 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2139 * it either.
2140 */
2141#ifndef WIN32
2142 if (geteuid() == 0)
2143 {
2144 pg_log_error("cannot be executed by \"root\"");
2145 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2146 progname);
2147 exit(1);
2148 }
2149#endif
2150
2152
2153 while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
2154 long_options, &option_index)) != -1)
2155 {
2156 switch (c)
2157 {
2158 case 'a':
2159 opt.all_dbs = true;
2160 break;
2161 case 'd':
2163 {
2165 num_dbs++;
2166 }
2167 else
2168 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2169 break;
2170 case 'D':
2173 break;
2174 case 'n':
2175 dry_run = true;
2176 break;
2177 case 'p':
2178 opt.sub_port = pg_strdup(optarg);
2179 break;
2180 case 'P':
2182 break;
2183 case 's':
2186 break;
2187 case 't':
2188 opt.recovery_timeout = atoi(optarg);
2189 break;
2190 case 'T':
2191 opt.two_phase = true;
2192 break;
2193 case 'U':
2195 break;
2196 case 'v':
2198 break;
2199 case 1:
2201 break;
2202 case 2:
2204 {
2206 num_pubs++;
2207 }
2208 else
2209 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2210 break;
2211 case 3:
2213 {
2215 num_replslots++;
2216 }
2217 else
2218 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2219 break;
2220 case 4:
2222 {
2224 num_subs++;
2225 }
2226 else
2227 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2228 break;
2229 case 5:
2232 else
2233 pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
2234 break;
2235 default:
2236 /* getopt_long already emitted a complaint */
2237 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2238 exit(1);
2239 }
2240 }
2241
2242 /* Validate that --all is not used with incompatible options */
2243 if (opt.all_dbs)
2244 {
2245 char *bad_switch = NULL;
2246
2247 if (num_dbs > 0)
2248 bad_switch = "--database";
2249 else if (num_pubs > 0)
2250 bad_switch = "--publication";
2251 else if (num_replslots > 0)
2252 bad_switch = "--replication-slot";
2253 else if (num_subs > 0)
2254 bad_switch = "--subscription";
2255
2256 if (bad_switch)
2257 {
2258 pg_log_error("options %s and -a/--all cannot be used together", bad_switch);
2259 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2260 exit(1);
2261 }
2262 }
2263
2264 /* Any non-option arguments? */
2265 if (optind < argc)
2266 {
2267 pg_log_error("too many command-line arguments (first is \"%s\")",
2268 argv[optind]);
2269 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2270 exit(1);
2271 }
2272
2273 /* Required arguments */
2274 if (subscriber_dir == NULL)
2275 {
2276 pg_log_error("no subscriber data directory specified");
2277 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2278 exit(1);
2279 }
2280
2281 /* If socket directory is not provided, use the current directory */
2282 if (opt.socket_dir == NULL)
2283 {
2284 char cwd[MAXPGPATH];
2285
2286 if (!getcwd(cwd, MAXPGPATH))
2287 pg_fatal("could not determine current directory");
2288 opt.socket_dir = pg_strdup(cwd);
2290 }
2291
2292 /*
2293 * Parse connection string. Build a base connection string that might be
2294 * reused by multiple databases.
2295 */
2296 if (opt.pub_conninfo_str == NULL)
2297 {
2298 /*
2299 * TODO use primary_conninfo (if available) from subscriber and
2300 * extract publisher connection string. Assume that there are
2301 * identical entries for physical and logical replication. If there is
2302 * not, we would fail anyway.
2303 */
2304 pg_log_error("no publisher connection string specified");
2305 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2306 exit(1);
2307 }
2308
2309 if (dry_run)
2310 pg_log_info("Executing in dry-run mode.\n"
2311 "The target directory will not be modified.");
2312
2313 pg_log_info("validating publisher connection string");
2314 pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2315 &dbname_conninfo);
2316 if (pub_base_conninfo == NULL)
2317 exit(1);
2318
2319 pg_log_info("validating subscriber connection string");
2320 sub_base_conninfo = get_sub_conninfo(&opt);
2321
2322 /*
2323 * Fetch all databases from the source (publisher) and treat them as if
2324 * the user specified has multiple --database options, one for each source
2325 * database.
2326 */
2327 if (opt.all_dbs)
2328 {
2329 bool dbnamespecified = (dbname_conninfo != NULL);
2330
2331 get_publisher_databases(&opt, dbnamespecified);
2332 }
2333
2334 if (opt.database_names.head == NULL)
2335 {
2336 pg_log_info("no database was specified");
2337
2338 /*
2339 * Try to obtain the dbname from the publisher conninfo. If dbname
2340 * parameter is not available, error out.
2341 */
2342 if (dbname_conninfo)
2343 {
2344 simple_string_list_append(&opt.database_names, dbname_conninfo);
2345 num_dbs++;
2346
2347 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2348 dbname_conninfo);
2349 }
2350 else
2351 {
2352 pg_log_error("no database name specified");
2353 pg_log_error_hint("Try \"%s --help\" for more information.",
2354 progname);
2355 exit(1);
2356 }
2357 }
2358
2359 /* Number of object names must match number of databases */
2360 if (num_pubs > 0 && num_pubs != num_dbs)
2361 {
2362 pg_log_error("wrong number of publication names specified");
2363 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2364 num_pubs, num_dbs);
2365 exit(1);
2366 }
2367 if (num_subs > 0 && num_subs != num_dbs)
2368 {
2369 pg_log_error("wrong number of subscription names specified");
2370 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2371 num_subs, num_dbs);
2372 exit(1);
2373 }
2374 if (num_replslots > 0 && num_replslots != num_dbs)
2375 {
2376 pg_log_error("wrong number of replication slot names specified");
2377 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2379 exit(1);
2380 }
2381
2382 /* Verify the object types specified for removal from the subscriber */
2383 for (SimpleStringListCell *cell = opt.objecttypes_to_clean.head; cell; cell = cell->next)
2384 {
2385 if (pg_strcasecmp(cell->val, "publications") == 0)
2387 else
2388 {
2389 pg_log_error("invalid object type \"%s\" specified for --clean", cell->val);
2390 pg_log_error_hint("The valid value is: \"%s\"", "publications");
2391 exit(1);
2392 }
2393 }
2394
2395 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2396 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2397 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2398
2399 /* Rudimentary check for a data directory */
2401
2403
2404 /*
2405 * Store database information for publisher and subscriber. It should be
2406 * called before atexit() because its return is used in the
2407 * cleanup_objects_atexit().
2408 */
2409 dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2410
2411 /* Register a function to clean up objects in case of failure */
2412 atexit(cleanup_objects_atexit);
2413
2414 /*
2415 * Check if the subscriber data directory has the same system identifier
2416 * than the publisher data directory.
2417 */
2419 sub_sysid = get_standby_sysid(subscriber_dir);
2420 if (pub_sysid != sub_sysid)
2421 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2422
2423 /* Subscriber PID file */
2424 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2425
2426 /*
2427 * The standby server must not be running. If the server is started under
2428 * service manager and pg_createsubscriber stops it, the service manager
2429 * might react to this action and start the server again. Therefore,
2430 * refuse to proceed if the server is running to avoid possible failures.
2431 */
2432 if (stat(pidfile, &statbuf) == 0)
2433 {
2434 pg_log_error("standby server is running");
2435 pg_log_error_hint("Stop the standby server and try again.");
2436 exit(1);
2437 }
2438
2439 /*
2440 * Start a short-lived standby server with temporary parameters (provided
2441 * by command-line options). The goal is to avoid connections during the
2442 * transformation steps.
2443 */
2444 pg_log_info("starting the standby server with command-line options");
2445 start_standby_server(&opt, true, false);
2446
2447 /* Check if the standby server is ready for logical replication */
2449
2450 /* Check if the primary server is ready for logical replication */
2452
2453 /*
2454 * Stop the target server. The recovery process requires that the server
2455 * reaches a consistent state before targeting the recovery stop point.
2456 * Make sure a consistent state is reached (stop the target server
2457 * guarantees it) *before* creating the replication slots in
2458 * setup_publisher().
2459 */
2460 pg_log_info("stopping the subscriber");
2462
2463 /* Create the required objects for each database on publisher */
2464 consistent_lsn = setup_publisher(dbinfos.dbinfo);
2465
2466 /* Write the required recovery parameters */
2467 setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2468
2469 /*
2470 * Start subscriber so the recovery parameters will take effect. Wait
2471 * until accepting connections. We don't want to start logical replication
2472 * during setup.
2473 */
2474 pg_log_info("starting the subscriber");
2475 start_standby_server(&opt, true, true);
2476
2477 /* Waiting the subscriber to be promoted */
2479
2480 /*
2481 * Create the subscription for each database on subscriber. It does not
2482 * enable it immediately because it needs to adjust the replication start
2483 * point to the LSN reported by setup_publisher(). It also cleans up
2484 * publications created by this tool and replication to the standby.
2485 */
2486 setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2487
2488 /* Remove primary_slot_name if it exists on primary */
2490
2491 /* Remove failover replication slots if they exist on subscriber */
2493
2494 /* Stop the subscriber */
2495 pg_log_info("stopping the subscriber");
2497
2498 /* Change system identifier from subscriber */
2500
2501 success = true;
2502
2503 pg_log_info("Done!");
2504
2505 return 0;
2506}
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1202
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:430
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:176
@ PG_LOG_WARNING
Definition: logging.h:38
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static char * pg_ctl_path
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static void check_data_directory(const char *datadir)
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static char * pg_resetwal_path
static void usage(void)
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:32
void canonicalize_path(char *path)
Definition: path.c:337
#define snprintf
Definition: port.h:260
const char * get_progname(const char *argv0)
Definition: path.c:652
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
struct SimpleStringList SimpleStringList
SimpleStringList objecttypes_to_clean
SimpleStringList replslot_names
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42

References CreateSubscriberOptions::all_dbs, canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, LogicalRepInfos::dbinfo, dbinfos, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_publisher_databases(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), getopt_long(), SimpleStringList::head, MAXPGPATH, modify_subscriber_sysid(), SimpleStringListCell::next, no_argument, num_dbs, num_pubs, num_replslots, num_subs, OBJECTTYPE_PUBLICATIONS, CreateSubscriberOptions::objecttypes_to_clean, LogicalRepInfos::objecttypes_to_clean, optarg, optind, pg_ctl_path, pg_fatal, pg_log_error, pg_log_error_detail, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_resetwal_path, pg_strcasecmp(), pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, LogicalRepInfo::subconninfo, subscriber_dir, success, CreateSubscriberOptions::two_phase, LogicalRepInfos::two_phase, usage(), and wait_for_end_recovery().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 661 of file pg_createsubscriber.c.

662{
663 ControlFileData *cf;
664 bool crc_ok;
665 struct timeval tv;
666
667 char *cmd_str;
668
669 pg_log_info("modifying system identifier of subscriber");
670
671 cf = get_controlfile(subscriber_dir, &crc_ok);
672 if (!crc_ok)
673 pg_fatal("control file appears to be corrupt");
674
675 /*
676 * Select a new system identifier.
677 *
678 * XXX this code was extracted from BootStrapXLOG().
679 */
680 gettimeofday(&tv, NULL);
681 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
682 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
683 cf->system_identifier |= getpid() & 0xFFF;
684
685 if (dry_run)
686 pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
688 else
689 {
691 pg_log_info("system identifier is %" PRIu64 " on subscriber",
693 }
694
695 if (dry_run)
696 pg_log_info("dry-run: would run pg_resetwal on the subscriber");
697 else
698 pg_log_info("running pg_resetwal on the subscriber");
699
700 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
702
703 pg_log_debug("pg_resetwal command is: %s", cmd_str);
704
705 if (!dry_run)
706 {
707 int rc = system(cmd_str);
708
709 if (rc == 0)
710 pg_log_info("successfully reset WAL on the subscriber");
711 else
712 pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
713 }
714
715 pg_free(cf);
716}
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define DEVNULL
Definition: port.h:161
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
int gettimeofday(struct timeval *tp, void *tzp)

References DEVNULL, dry_run, get_controlfile(), gettimeofday(), pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), subscriber_dir, ControlFileData::system_identifier, update_controlfile(), and wait_result_to_str().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char *  pg_ctl_cmd,
int  rc 
)
static

Definition at line 1476 of file pg_createsubscriber.c.

1477{
1478 if (rc != 0)
1479 {
1480 if (WIFEXITED(rc))
1481 {
1482 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1483 }
1484 else if (WIFSIGNALED(rc))
1485 {
1486#if defined(WIN32)
1487 pg_log_error("pg_ctl was terminated by exception 0x%X",
1488 WTERMSIG(rc));
1489 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1490#else
1491 pg_log_error("pg_ctl was terminated by signal %d: %s",
1492 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1493#endif
1494 }
1495 else
1496 {
1497 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1498 }
1499
1500 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1501 exit(1);
1502 }
1503}
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
#define WIFEXITED(w)
Definition: win32_port.h:150
#define WIFSIGNALED(w)
Definition: win32_port.h:151
#define WTERMSIG(w)
Definition: win32_port.h:153
#define WEXITSTATUS(w)
Definition: win32_port.h:152

References pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 849 of file pg_createsubscriber.c.

850{
851 PGresult *res;
852 int ret;
853
854 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
855
857 {
858 pg_log_error("could not obtain recovery progress: %s",
861 }
862
863
864 ret = strcmp("t", PQgetvalue(res, 0, 0));
865
866 PQclear(res);
867
868 return ret == 0;
869}

References conn, disconnect_database(), pg_log_error, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetvalue, PQresultErrorMessage, and PQresultStatus.

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char *  lsn 
)
static

Definition at line 1873 of file pg_createsubscriber.c.

1874{
1876 PGresult *res;
1877 Oid suboid;
1878 char *subname;
1879 char *dbname;
1880 char *originname;
1881 char *lsnstr;
1882
1883 Assert(conn != NULL);
1884
1885 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1886 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1887
1889 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1890 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1891 "WHERE s.subname = %s AND d.datname = %s",
1892 subname, dbname);
1893
1894 res = PQexec(conn, str->data);
1895 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1896 {
1897 pg_log_error("could not obtain subscription OID: %s",
1900 }
1901
1902 if (PQntuples(res) != 1 && !dry_run)
1903 {
1904 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1905 PQntuples(res), 1);
1907 }
1908
1909 if (dry_run)
1910 {
1911 suboid = InvalidOid;
1912 lsnstr = psprintf("%X/%08X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1913 }
1914 else
1915 {
1916 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1917 lsnstr = psprintf("%s", lsn);
1918 }
1919
1920 PQclear(res);
1921
1922 /*
1923 * The origin name is defined as pg_%u. %u is the subscription OID. See
1924 * ApplyWorkerMain().
1925 */
1926 originname = psprintf("pg_%u", suboid);
1927
1928 if (dry_run)
1929 pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1930 originname, lsnstr, dbinfo->dbname);
1931 else
1932 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1933 originname, lsnstr, dbinfo->dbname);
1934
1937 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1938 originname, lsnstr);
1939
1940 pg_log_debug("command is: %s", str->data);
1941
1942 if (!dry_run)
1943 {
1944 res = PQexec(conn, str->data);
1945 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1946 {
1947 pg_log_error("could not set replication progress for subscription \"%s\": %s",
1948 dbinfo->subname, PQresultErrorMessage(res));
1950 }
1951 PQclear(res);
1952 }
1953
1956 pg_free(originname);
1957 pg_free(lsnstr);
1959}
#define InvalidOid
Definition: postgres_ext.h:37
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear, PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue, PQntuples, PQresultErrorMessage, PQresultStatus, psprintf(), resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 773 of file pg_createsubscriber.c.

774{
775 char *lsn = NULL;
776
777 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
778
779 for (int i = 0; i < num_dbs; i++)
780 {
781 PGconn *conn;
782 char *genname = NULL;
783
784 conn = connect_database(dbinfo[i].pubconninfo, true);
785
786 /*
787 * If an object name was not specified as command-line options, assign
788 * a generated object name. The replication slot has a different rule.
789 * The subscription name is assigned to the replication slot name if
790 * no replication slot is specified. It follows the same rule as
791 * CREATE SUBSCRIPTION.
792 */
793 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
794 genname = generate_object_name(conn);
795 if (num_pubs == 0)
796 dbinfo[i].pubname = pg_strdup(genname);
797 if (num_subs == 0)
798 dbinfo[i].subname = pg_strdup(genname);
799 if (num_replslots == 0)
800 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
801
802 /*
803 * Create publication on publisher. This step should be executed
804 * *before* promoting the subscriber to avoid any transactions between
805 * consistent LSN and the new publication rows (such transactions
806 * wouldn't see the new publication rows resulting in an error).
807 */
808 create_publication(conn, &dbinfo[i]);
809
810 /* Create replication slot on publisher */
811 if (lsn)
812 pg_free(lsn);
813 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
814 if (lsn == NULL && !dry_run)
815 exit(1);
816
817 /*
818 * Since we are using the LSN returned by the last replication slot as
819 * recovery_target_lsn, this LSN is ahead of the current WAL position
820 * and the recovery waits until the publisher writes a WAL record to
821 * reach the target and ends the recovery. On idle systems, this wait
822 * time is unpredictable and could lead to failure in promoting the
823 * subscriber. To avoid that, insert a harmless WAL record.
824 */
825 if (i == num_dbs - 1 && !dry_run)
826 {
827 PGresult *res;
828
829 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
831 {
832 pg_log_error("could not write an additional WAL record: %s",
835 }
836 PQclear(res);
837 }
838
840 }
841
842 return lsn;
843}
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), disconnect_database(), dry_run, generate_object_name(), i, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_error, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear, PQexec(), PQresultErrorMessage, PQresultStatus, prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char *  datadir,
const char *  lsn 
)
static

Definition at line 1237 of file pg_createsubscriber.c.

1238{
1239 PGconn *conn;
1241
1242 /*
1243 * Despite of the recovery parameters will be written to the subscriber,
1244 * use a publisher connection. The primary_conninfo is generated using the
1245 * connection settings.
1246 */
1247 conn = connect_database(dbinfo[0].pubconninfo, true);
1248
1249 /*
1250 * Write recovery parameters.
1251 *
1252 * The subscriber is not running yet. In dry run mode, the recovery
1253 * parameters *won't* be written. An invalid LSN is used for printing
1254 * purposes. Additional recovery parameters are added here. It avoids
1255 * unexpected behavior such as end of recovery as soon as a consistent
1256 * state is reached (recovery_target) and failure due to multiple recovery
1257 * targets (name, time, xid, LSN).
1258 */
1260 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1262 "recovery_target_timeline = 'latest'\n");
1263
1264 /*
1265 * Set recovery_target_inclusive = false to avoid reapplying the
1266 * transaction committed at 'lsn' after subscription is enabled. This is
1267 * because the provided 'lsn' is also used as the replication start point
1268 * for the subscription. So, the server can send the transaction committed
1269 * at that 'lsn' after replication is started which can lead to applying
1270 * the same transaction twice if we keep recovery_target_inclusive = true.
1271 */
1273 "recovery_target_inclusive = false\n");
1275 "recovery_target_action = promote\n");
1276 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1277 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1278 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1279
1280 if (dry_run)
1281 {
1282 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode\n");
1284 "recovery_target_lsn = '%X/%08X'\n",
1286 }
1287 else
1288 {
1289 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1290 lsn);
1292 }
1293 disconnect_database(conn, false);
1294
1295 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1296}
static PQExpBuffer recoveryconfcontents
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:125
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:28

References appendPQExpBuffer(), appendPQExpBufferStr(), conn, connect_database(), PQExpBufferData::data, datadir, disconnect_database(), dry_run, GenerateRecoveryConfig(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_log_debug, recoveryconfcontents, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char *  consistent_lsn 
)
static

Definition at line 1201 of file pg_createsubscriber.c.

1202{
1203 for (int i = 0; i < num_dbs; i++)
1204 {
1205 PGconn *conn;
1206
1207 /* Connect to subscriber. */
1208 conn = connect_database(dbinfo[i].subconninfo, true);
1209
1210 /*
1211 * We don't need the pre-existing subscriptions on the newly formed
1212 * subscriber. They can connect to other publisher nodes and either
1213 * get some unwarranted data or can lead to ERRORs in connecting to
1214 * such nodes.
1215 */
1217
1218 /* Check and drop the required publications in the given database. */
1220
1221 create_subscription(conn, &dbinfo[i]);
1222
1223 /* Set the replication progress to the correct LSN */
1224 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1225
1226 /* Enable subscription */
1227 enable_subscription(conn, &dbinfo[i]);
1228
1229 disconnect_database(conn, false);
1230 }
1231}
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), check_and_drop_publications(), conn, connect_database(), create_subscription(), disconnect_database(), enable_subscription(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access,
bool  restrict_logical_worker 
)
static

Definition at line 1506 of file pg_createsubscriber.c.

1508{
1509 PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1510 int rc;
1511
1512 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1513 appendShellString(pg_ctl_cmd, subscriber_dir);
1514 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1515
1516 /* Prevent unintended slot invalidation */
1517 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1518
1519 if (restricted_access)
1520 {
1521 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1522#if !defined(WIN32)
1523
1524 /*
1525 * An empty listen_addresses list means the server does not listen on
1526 * any IP interfaces; only Unix-domain sockets can be used to connect
1527 * to the server. Prevent external connections to minimize the chance
1528 * of failure.
1529 */
1530 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1531 if (opt->socket_dir)
1532 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1533 opt->socket_dir);
1534 appendPQExpBufferChar(pg_ctl_cmd, '"');
1535#endif
1536 }
1537 if (opt->config_file != NULL)
1538 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1539 opt->config_file);
1540
1541 /* Suppress to start logical replication if requested */
1542 if (restrict_logical_worker)
1543 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1544
1545 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1546 rc = system(pg_ctl_cmd->data);
1547 pg_ctl_status(pg_ctl_cmd->data, rc);
1548 standby_running = true;
1549 destroyPQExpBuffer(pg_ctl_cmd);
1550 pg_log_info("server was started");
1551}
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:582

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char *  datadir)
static

Definition at line 1554 of file pg_createsubscriber.c.

1555{
1556 char *pg_ctl_cmd;
1557 int rc;
1558
1559 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1560 datadir);
1561 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1562 rc = system(pg_ctl_cmd);
1563 pg_ctl_status(pg_ctl_cmd, rc);
1564 standby_running = false;
1565 pg_log_info("server was stopped");
1566}

References datadir, pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, psprintf(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char *  pub_base_conninfo,
const char *  sub_base_conninfo 
)
static

Definition at line 466 of file pg_createsubscriber.c.

469{
470 struct LogicalRepInfo *dbinfo;
471 SimpleStringListCell *pubcell = NULL;
472 SimpleStringListCell *subcell = NULL;
473 SimpleStringListCell *replslotcell = NULL;
474 int i = 0;
475
476 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
477
478 if (num_pubs > 0)
479 pubcell = opt->pub_names.head;
480 if (num_subs > 0)
481 subcell = opt->sub_names.head;
482 if (num_replslots > 0)
483 replslotcell = opt->replslot_names.head;
484
485 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
486 {
487 char *conninfo;
488
489 /* Fill publisher attributes */
490 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
491 dbinfo[i].pubconninfo = conninfo;
492 dbinfo[i].dbname = cell->val;
493 if (num_pubs > 0)
494 dbinfo[i].pubname = pubcell->val;
495 else
496 dbinfo[i].pubname = NULL;
497 if (num_replslots > 0)
498 dbinfo[i].replslotname = replslotcell->val;
499 else
500 dbinfo[i].replslotname = NULL;
501 dbinfo[i].made_replslot = false;
502 dbinfo[i].made_publication = false;
503 /* Fill subscriber attributes */
504 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
505 dbinfo[i].subconninfo = conninfo;
506 if (num_subs > 0)
507 dbinfo[i].subname = subcell->val;
508 else
509 dbinfo[i].subname = NULL;
510 /* Other fields will be filled later */
511
512 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
513 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
514 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
515 dbinfo[i].pubconninfo);
516 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
517 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
518 dbinfo[i].subconninfo,
519 dbinfos.two_phase ? "true" : "false");
520
521 if (num_pubs > 0)
522 pubcell = pubcell->next;
523 if (num_subs > 0)
524 subcell = subcell->next;
525 if (num_replslots > 0)
526 replslotcell = replslotcell->next;
527
528 i++;
529 }
530
531 return dbinfo;
532}
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:56
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfos, LogicalRepInfo::dbname, SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, SimpleStringListCell::next, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, LogicalRepInfos::two_phase, and SimpleStringListCell::val.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 237 of file pg_createsubscriber.c.

238{
239 printf(_("%s creates a new logical replica from a standby server.\n\n"),
240 progname);
241 printf(_("Usage:\n"));
242 printf(_(" %s [OPTION]...\n"), progname);
243 printf(_("\nOptions:\n"));
244 printf(_(" -a, --all create subscriptions for all databases except template\n"
245 " databases and databases that don't allow connections\n"));
246 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
247 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
248 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
249 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
250 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
251 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
252 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
253 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
254 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
255 printf(_(" -v, --verbose output verbose messages\n"));
256 printf(_(" --clean=OBJECTTYPE drop all objects of the specified type from specified\n"
257 " databases on the subscriber; accepts: \"%s\"\n"), "publications");
258 printf(_(" --config-file=FILENAME use specified main server configuration\n"
259 " file when running target cluster\n"));
260 printf(_(" --publication=NAME publication name\n"));
261 printf(_(" --replication-slot=NAME replication slot name\n"));
262 printf(_(" --subscription=NAME subscription name\n"));
263 printf(_(" -V, --version output version information, then exit\n"));
264 printf(_(" -?, --help show this help, then exit\n"));
265 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
266 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
267}
#define _(x)
Definition: elog.c:91
#define printf(...)
Definition: port.h:266

References _, DEFAULT_SUB_PORT, printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char *  conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1578 of file pg_createsubscriber.c.

1579{
1580 PGconn *conn;
1581 bool ready = false;
1582 int timer = 0;
1583
1584 pg_log_info("waiting for the target server to reach the consistent state");
1585
1586 conn = connect_database(conninfo, true);
1587
1588 for (;;)
1589 {
1590 /* Did the recovery process finish? We're done if so. */
1592 {
1593 ready = true;
1594 recovery_ended = true;
1595 break;
1596 }
1597
1598 /* Bail out after recovery_timeout seconds if this option is set */
1599 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1600 {
1602 pg_log_error("recovery timed out");
1604 }
1605
1606 /* Keep waiting */
1608 timer += WAIT_INTERVAL;
1609 }
1610
1611 disconnect_database(conn, false);
1612
1613 if (!ready)
1614 pg_fatal("server did not end recovery");
1615
1616 pg_log_info("target server reached the consistent state");
1617 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1618}
#define USECS_PER_SEC
Definition: timestamp.h:134
#define pg_log_info_hint(...)
Definition: logging.h:130
#define WAIT_INTERVAL
void pg_usleep(long microsec)
Definition: signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), recovery_ended, CreateSubscriberOptions::recovery_timeout, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USECS_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfos

◆ dry_run

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 144 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 146 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 145 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 150 of file pg_createsubscriber.c.

Referenced by main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 151 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

Definition at line 137 of file pg_createsubscriber.c.

Referenced by check_subscriber(), drop_primary_replication_slot(), and main().

◆ prng_state

pg_prng_state prng_state
static

Definition at line 148 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 135 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 156 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

char* subscriber_dir = NULL
static

◆ success

bool success = false
static

Definition at line 140 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().