PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/wait.h>
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 
struct  LogicalRepInfos
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define OBJECTTYPE_PUBLICATIONS   0x0001
 
#define USEC_PER_SEC   1000000
 
#define WAIT_INTERVAL   1 /* 1 second */
 

Enumerations

enum  WaitPMResult {
  POSTMASTER_READY , POSTMASTER_STILL_STARTING , POSTMASTER_READY , POSTMASTER_STILL_STARTING ,
  POSTMASTER_SHUTDOWN_IN_RECOVERY , POSTMASTER_FAILED
}
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage ()
 
static char * get_base_conninfo (const char *conninfo, char **dbname)
 
static char * get_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static char * get_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static char * concat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static char * generate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static char * setup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
 
static char * create_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_publication (PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
 
static void check_and_drop_publications (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void drop_existing_subscriptions (PGconn *conn, const char *subname, const char *dbname)
 
static void get_publisher_databases (struct CreateSubscriberOptions *opt, bool dbnamespecified)
 
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
 
int main (int argc, char **argv)
 

Variables

static const char * progname
 
static char * primary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfos dbinfos
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static char * pg_ctl_path = NULL
 
static char * pg_resetwal_path = NULL
 
static char * subscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 31 of file pg_createsubscriber.c.

◆ OBJECTTYPE_PUBLICATIONS

#define OBJECTTYPE_PUBLICATIONS   0x0001

Definition at line 32 of file pg_createsubscriber.c.

◆ USEC_PER_SEC

#define USEC_PER_SEC   1000000

Definition at line 131 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 132 of file pg_createsubscriber.c.

Enumeration Type Documentation

◆ WaitPMResult

Enumerator
POSTMASTER_READY 
POSTMASTER_STILL_STARTING 
POSTMASTER_READY 
POSTMASTER_STILL_STARTING 
POSTMASTER_SHUTDOWN_IN_RECOVERY 
POSTMASTER_FAILED 

Definition at line 158 of file pg_createsubscriber.c.

159{
162};
@ POSTMASTER_READY
@ POSTMASTER_STILL_STARTING

Function Documentation

◆ appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer  buf,
const char *  keyword,
const char *  val 
)
static

Definition at line 279 of file pg_createsubscriber.c.

280{
281 if (buf->len > 0)
283 appendPQExpBufferStr(buf, keyword);
286}
long val
Definition: informix.c:689
static char * buf
Definition: pg_test_fsync.c:72
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:698

References appendConnStrVal(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and val.

Referenced by concat_conninfo_dbname(), get_base_conninfo(), and get_sub_conninfo().

◆ check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1151 of file pg_createsubscriber.c.

1153{
1155 char *dbname;
1156 PGresult *res;
1157
1158 Assert(conn != NULL);
1159
1160 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1161
1162 appendPQExpBuffer(query,
1163 "SELECT s.subname FROM pg_catalog.pg_subscription s "
1164 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1165 "WHERE d.datname = %s",
1166 dbname);
1167 res = PQexec(conn, query->data);
1168
1169 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1170 {
1171 pg_log_error("could not obtain pre-existing subscriptions: %s",
1174 }
1175
1176 for (int i = 0; i < PQntuples(res); i++)
1178 dbinfo->dbname);
1179
1180 PQclear(res);
1181 destroyPQExpBuffer(query);
1183}
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4363
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
#define pg_log_error(...)
Definition: logging.h:106
static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
static void disconnect_database(PGconn *conn, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscriptions(), i, pg_log_error, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), and PQresultStatus().

Referenced by setup_subscriber().

◆ check_and_drop_publications()

static void check_and_drop_publications ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1730 of file pg_createsubscriber.c.

1731{
1732 PGresult *res;
1734
1735 Assert(conn != NULL);
1736
1737 if (drop_all_pubs)
1738 {
1739 pg_log_info("dropping all existing publications in database \"%s\"",
1740 dbinfo->dbname);
1741
1742 /* Fetch all publication names */
1743 res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
1744 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1745 {
1746 pg_log_error("could not obtain publication information: %s",
1748 PQclear(res);
1750 }
1751
1752 /* Drop each publication */
1753 for (int i = 0; i < PQntuples(res); i++)
1754 drop_publication(conn, PQgetvalue(res, i, 0), dbinfo->dbname,
1755 &dbinfo->made_publication);
1756
1757 PQclear(res);
1758 }
1759
1760 /*
1761 * In dry-run mode, we don't create publications, but we still try to drop
1762 * those to provide necessary information to the user.
1763 */
1764 if (!drop_all_pubs || dry_run)
1765 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
1766 &dbinfo->made_publication);
1767}
#define pg_log_info(...)
Definition: logging.h:124
static struct LogicalRepInfos dbinfos
static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication)
static bool dry_run
#define OBJECTTYPE_PUBLICATIONS

References Assert(), conn, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), dry_run, i, LogicalRepInfo::made_publication, OBJECTTYPE_PUBLICATIONS, LogicalRepInfos::objecttypes_to_remove, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), and LogicalRepInfo::pubname.

Referenced by setup_subscriber().

◆ check_data_directory()

static void check_data_directory ( const char *  datadir)
static

Definition at line 407 of file pg_createsubscriber.c.

408{
409 struct stat statbuf;
410 char versionfile[MAXPGPATH];
411
412 pg_log_info("checking if directory \"%s\" is a cluster data directory",
413 datadir);
414
415 if (stat(datadir, &statbuf) != 0)
416 {
417 if (errno == ENOENT)
418 pg_fatal("data directory \"%s\" does not exist", datadir);
419 else
420 pg_fatal("could not access directory \"%s\": %m", datadir);
421 }
422
423 snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
424 if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
425 {
426 pg_fatal("directory \"%s\" is not a database cluster directory",
427 datadir);
428 }
429}
#define pg_fatal(...)
#define MAXPGPATH
char * datadir
#define snprintf
Definition: port.h:239
#define stat
Definition: win32_port.h:274

References datadir, MAXPGPATH, pg_fatal, pg_log_info, snprintf, and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 870 of file pg_createsubscriber.c.

871{
872 PGconn *conn;
873 PGresult *res;
874 bool failed = false;
875
876 char *wal_level;
877 int max_repslots;
878 int cur_repslots;
879 int max_walsenders;
880 int cur_walsenders;
881 int max_prepared_transactions;
882 char *max_slot_wal_keep_size;
883
884 pg_log_info("checking settings on publisher");
885
886 conn = connect_database(dbinfo[0].pubconninfo, true);
887
888 /*
889 * If the primary server is in recovery (i.e. cascading replication),
890 * objects (publication) cannot be created because it is read only.
891 */
893 {
894 pg_log_error("primary server cannot be in recovery");
896 }
897
898 /*------------------------------------------------------------------------
899 * Logical replication requires a few parameters to be set on publisher.
900 * Since these parameters are not a requirement for physical replication,
901 * we should check it to make sure it won't fail.
902 *
903 * - wal_level = logical
904 * - max_replication_slots >= current + number of dbs to be converted
905 * - max_wal_senders >= current + number of dbs to be converted
906 * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
907 * -----------------------------------------------------------------------
908 */
909 res = PQexec(conn,
910 "SELECT pg_catalog.current_setting('wal_level'),"
911 " pg_catalog.current_setting('max_replication_slots'),"
912 " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
913 " pg_catalog.current_setting('max_wal_senders'),"
914 " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
915 " pg_catalog.current_setting('max_prepared_transactions'),"
916 " pg_catalog.current_setting('max_slot_wal_keep_size')");
917
919 {
920 pg_log_error("could not obtain publisher settings: %s",
923 }
924
925 wal_level = pg_strdup(PQgetvalue(res, 0, 0));
926 max_repslots = atoi(PQgetvalue(res, 0, 1));
927 cur_repslots = atoi(PQgetvalue(res, 0, 2));
928 max_walsenders = atoi(PQgetvalue(res, 0, 3));
929 cur_walsenders = atoi(PQgetvalue(res, 0, 4));
930 max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
931 max_slot_wal_keep_size = pg_strdup(PQgetvalue(res, 0, 6));
932
933 PQclear(res);
934
935 pg_log_debug("publisher: wal_level: %s", wal_level);
936 pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
937 pg_log_debug("publisher: current replication slots: %d", cur_repslots);
938 pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
939 pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
940 pg_log_debug("publisher: max_prepared_transactions: %d",
941 max_prepared_transactions);
942 pg_log_debug("publisher: max_slot_wal_keep_size: %s",
943 max_slot_wal_keep_size);
944
946
947 if (strcmp(wal_level, "logical") != 0)
948 {
949 pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
950 failed = true;
951 }
952
953 if (max_repslots - cur_repslots < num_dbs)
954 {
955 pg_log_error("publisher requires %d replication slots, but only %d remain",
956 num_dbs, max_repslots - cur_repslots);
957 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
958 "max_replication_slots", cur_repslots + num_dbs);
959 failed = true;
960 }
961
962 if (max_walsenders - cur_walsenders < num_dbs)
963 {
964 pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
965 num_dbs, max_walsenders - cur_walsenders);
966 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
967 "max_wal_senders", cur_walsenders + num_dbs);
968 failed = true;
969 }
970
971 if (max_prepared_transactions != 0 && !dbinfos.two_phase)
972 {
973 pg_log_warning("two_phase option will not be enabled for replication slots");
974 pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
975 "Prepared transactions will be replicated at COMMIT PREPARED.");
976 pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase.");
977 }
978
979 /*
980 * Validate 'max_slot_wal_keep_size'. If this parameter is set to a
981 * non-default value, it may cause replication failures due to required
982 * WAL files being prematurely removed.
983 */
984 if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
985 {
986 pg_log_warning("required WAL could be removed from the publisher");
987 pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
988 "max_slot_wal_keep_size");
989 }
990
992
993 if (failed)
994 exit(1);
995}
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_warning_hint(...)
Definition: logging.h:121
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_debug(...)
Definition: logging.h:133
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
#define pg_log_warning(...)
Definition: pgfnames.c:24
int wal_level
Definition: xlog.c:131

References conn, connect_database(), dbinfos, disconnect_database(), dry_run, num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_log_warning, pg_log_warning_detail, pg_log_warning_hint, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), server_is_in_recovery(), LogicalRepInfos::two_phase, and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 1009 of file pg_createsubscriber.c.

1010{
1011 PGconn *conn;
1012 PGresult *res;
1013 bool failed = false;
1014
1015 int max_lrworkers;
1016 int max_reporigins;
1017 int max_wprocs;
1018
1019 pg_log_info("checking settings on subscriber");
1020
1021 conn = connect_database(dbinfo[0].subconninfo, true);
1022
1023 /* The target server must be a standby */
1025 {
1026 pg_log_error("target server must be a standby");
1028 }
1029
1030 /*------------------------------------------------------------------------
1031 * Logical replication requires a few parameters to be set on subscriber.
1032 * Since these parameters are not a requirement for physical replication,
1033 * we should check it to make sure it won't fail.
1034 *
1035 * - max_active_replication_origins >= number of dbs to be converted
1036 * - max_logical_replication_workers >= number of dbs to be converted
1037 * - max_worker_processes >= 1 + number of dbs to be converted
1038 *------------------------------------------------------------------------
1039 */
1040 res = PQexec(conn,
1041 "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
1042 "'max_logical_replication_workers', "
1043 "'max_active_replication_origins', "
1044 "'max_worker_processes', "
1045 "'primary_slot_name') "
1046 "ORDER BY name");
1047
1048 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1049 {
1050 pg_log_error("could not obtain subscriber settings: %s",
1053 }
1054
1055 max_reporigins = atoi(PQgetvalue(res, 0, 0));
1056 max_lrworkers = atoi(PQgetvalue(res, 1, 0));
1057 max_wprocs = atoi(PQgetvalue(res, 2, 0));
1058 if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1060
1061 pg_log_debug("subscriber: max_logical_replication_workers: %d",
1062 max_lrworkers);
1063 pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
1064 pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1066 pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1067
1068 PQclear(res);
1069
1070 disconnect_database(conn, false);
1071
1072 if (max_reporigins < num_dbs)
1073 {
1074 pg_log_error("subscriber requires %d active replication origins, but only %d remain",
1075 num_dbs, max_reporigins);
1076 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1077 "max_active_replication_origins", num_dbs);
1078 failed = true;
1079 }
1080
1081 if (max_lrworkers < num_dbs)
1082 {
1083 pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1084 num_dbs, max_lrworkers);
1085 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1086 "max_logical_replication_workers", num_dbs);
1087 failed = true;
1088 }
1089
1090 if (max_wprocs < num_dbs + 1)
1091 {
1092 pg_log_error("subscriber requires %d worker processes, but only %d remain",
1093 num_dbs + 1, max_wprocs);
1094 pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1095 "max_worker_processes", num_dbs + 1);
1096 failed = true;
1097 }
1098
1099 if (failed)
1100 exit(1);
1101}
static char * primary_slot_name

References conn, connect_database(), disconnect_database(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), primary_slot_name, and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 177 of file pg_createsubscriber.c.

178{
179 if (success)
180 return;
181
182 /*
183 * If the server is promoted, there is no way to use the current setup
184 * again. Warn the user that a new replication setup should be done before
185 * trying again.
186 */
187 if (recovery_ended)
188 {
189 pg_log_warning("failed after the end of recovery");
190 pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
191 "You must recreate the physical replica before continuing.");
192 }
193
194 for (int i = 0; i < num_dbs; i++)
195 {
196 struct LogicalRepInfo *dbinfo = &dbinfos.dbinfo[i];
197
198 if (dbinfo->made_publication || dbinfo->made_replslot)
199 {
200 PGconn *conn;
201
202 conn = connect_database(dbinfo->pubconninfo, false);
203 if (conn != NULL)
204 {
205 if (dbinfo->made_publication)
206 drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
207 &dbinfo->made_publication);
208 if (dbinfo->made_replslot)
209 drop_replication_slot(conn, dbinfo, dbinfo->replslotname);
211 }
212 else
213 {
214 /*
215 * If a connection could not be established, inform the user
216 * that some objects were left on primary and should be
217 * removed before trying again.
218 */
219 if (dbinfo->made_publication)
220 {
221 pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
222 dbinfo->pubname,
223 dbinfo->dbname);
224 pg_log_warning_hint("Drop this publication before trying again.");
225 }
226 if (dbinfo->made_replslot)
227 {
228 pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
229 dbinfo->replslotname,
230 dbinfo->dbname);
231 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
232 }
233 }
234 }
235 }
236
237 if (standby_running)
239}
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
static bool success
static bool recovery_ended
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running
struct LogicalRepInfo * dbinfo

References conn, connect_database(), LogicalRepInfos::dbinfo, dbinfos, LogicalRepInfo::dbname, disconnect_database(), drop_publication(), drop_replication_slot(), i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, LogicalRepInfo::replslotname, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char *  conninfo,
const char *  dbname 
)
static

Definition at line 439 of file pg_createsubscriber.c.

440{
442 char *ret;
443
444 Assert(conninfo != NULL);
445
446 appendPQExpBufferStr(buf, conninfo);
447 appendConnStrItem(buf, "dbname", dbname);
448
449 ret = pg_strdup(buf->data);
451
452 return ret;
453}
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), and pg_strdup().

Referenced by get_publisher_databases(), and store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char *  conninfo,
bool  exit_on_error 
)
static

Definition at line 536 of file pg_createsubscriber.c.

537{
538 PGconn *conn;
539 PGresult *res;
540
541 conn = PQconnectdb(conninfo);
543 {
544 pg_log_error("connection to database failed: %s",
546 PQfinish(conn);
547
548 if (exit_on_error)
549 exit(1);
550 return NULL;
551 }
552
553 /* Secure search_path */
556 {
557 pg_log_error("could not clear \"search_path\": %s",
559 PQclear(res);
560 PQfinish(conn);
561
562 if (exit_on_error)
563 exit(1);
564 return NULL;
565 }
566 PQclear(res);
567
568 return conn;
569}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:813
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7556
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5290
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7619
@ CONNECTION_OK
Definition: libpq-fe.h:84

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, pg_log_error, PGRES_TUPLES_OK, PQclear(), PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage(), PQresultStatus(), and PQstatus().

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), get_publisher_databases(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1359 of file pg_createsubscriber.c.

1360{
1362 PGresult *res = NULL;
1363 const char *slot_name = dbinfo->replslotname;
1364 char *slot_name_esc;
1365 char *lsn = NULL;
1366
1367 Assert(conn != NULL);
1368
1369 pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1370 slot_name, dbinfo->dbname);
1371
1372 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1373
1375 "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)",
1376 slot_name_esc,
1377 dbinfos.two_phase ? "true" : "false");
1378
1379 PQfreemem(slot_name_esc);
1380
1381 pg_log_debug("command is: %s", str->data);
1382
1383 if (!dry_run)
1384 {
1385 res = PQexec(conn, str->data);
1386 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1387 {
1388 pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1389 slot_name, dbinfo->dbname,
1391 PQclear(res);
1393 return NULL;
1394 }
1395
1396 lsn = pg_strdup(PQgetvalue(res, 0, 0));
1397 PQclear(res);
1398 }
1399
1400 /* For cleanup purposes */
1401 dbinfo->made_replslot = true;
1402
1404
1405 return lsn;
1406}
const char * str

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::replslotname, str, and LogicalRepInfos::two_phase.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1603 of file pg_createsubscriber.c.

1604{
1606 PGresult *res;
1607 char *ipubname_esc;
1608 char *spubname_esc;
1609
1610 Assert(conn != NULL);
1611
1612 ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1613 spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1614
1615 /* Check if the publication already exists */
1617 "SELECT 1 FROM pg_catalog.pg_publication "
1618 "WHERE pubname = %s",
1619 spubname_esc);
1620 res = PQexec(conn, str->data);
1621 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1622 {
1623 pg_log_error("could not obtain publication information: %s",
1626 }
1627
1628 if (PQntuples(res) == 1)
1629 {
1630 /*
1631 * Unfortunately, if it reaches this code path, it will always fail
1632 * (unless you decide to change the existing publication name). That's
1633 * bad but it is very unlikely that the user will choose a name with
1634 * pg_createsubscriber_ prefix followed by the exact database oid and
1635 * a random number.
1636 */
1637 pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1638 pg_log_error_hint("Consider renaming this publication before continuing.");
1640 }
1641
1642 PQclear(res);
1644
1645 pg_log_info("creating publication \"%s\" in database \"%s\"",
1646 dbinfo->pubname, dbinfo->dbname);
1647
1648 appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1649 ipubname_esc);
1650
1651 pg_log_debug("command is: %s", str->data);
1652
1653 if (!dry_run)
1654 {
1655 res = PQexec(conn, str->data);
1656 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1657 {
1658 pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1659 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
1661 }
1662 PQclear(res);
1663 }
1664
1665 /* For cleanup purposes */
1666 dbinfo->made_publication = true;
1667
1668 PQfreemem(ipubname_esc);
1669 PQfreemem(spubname_esc);
1671}
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4369
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, LogicalRepInfo::made_publication, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubname, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1781 of file pg_createsubscriber.c.

1782{
1784 PGresult *res;
1785 char *pubname_esc;
1786 char *subname_esc;
1787 char *pubconninfo_esc;
1788 char *replslotname_esc;
1789
1790 Assert(conn != NULL);
1791
1792 pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1793 subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1794 pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1795 replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1796
1797 pg_log_info("creating subscription \"%s\" in database \"%s\"",
1798 dbinfo->subname, dbinfo->dbname);
1799
1801 "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1802 "WITH (create_slot = false, enabled = false, "
1803 "slot_name = %s, copy_data = false, two_phase = %s)",
1804 subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
1805 dbinfos.two_phase ? "true" : "false");
1806
1807 PQfreemem(pubname_esc);
1808 PQfreemem(subname_esc);
1809 PQfreemem(pubconninfo_esc);
1810 PQfreemem(replslotname_esc);
1811
1812 pg_log_debug("command is: %s", str->data);
1813
1814 if (!dry_run)
1815 {
1816 res = PQexec(conn, str->data);
1817 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1818 {
1819 pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1820 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
1822 }
1823 PQclear(res);
1824 }
1825
1827}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbinfos, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, str, LogicalRepInfo::subname, and LogicalRepInfos::two_phase.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_existing_subscriptions()

static void drop_existing_subscriptions ( PGconn conn,
const char *  subname,
const char *  dbname 
)
static

Definition at line 1110 of file pg_createsubscriber.c.

1111{
1113 PGresult *res;
1114
1115 Assert(conn != NULL);
1116
1117 /*
1118 * Construct a query string. These commands are allowed to be executed
1119 * within a transaction.
1120 */
1121 appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1122 subname);
1123 appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1124 subname);
1125 appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1126
1127 pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1128 subname, dbname);
1129
1130 if (!dry_run)
1131 {
1132 res = PQexec(conn, query->data);
1133
1134 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1135 {
1136 pg_log_error("could not drop subscription \"%s\": %s",
1139 }
1140
1141 PQclear(res);
1142 }
1143
1144 destroyPQExpBuffer(query);
1145}
NameData subname

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultErrorMessage(), PQresultStatus(), and subname.

Referenced by check_and_drop_existing_subscriptions().

◆ drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo dbinfo)
static

Definition at line 1317 of file pg_createsubscriber.c.

1318{
1319 PGconn *conn;
1320 PGresult *res;
1321
1322 conn = connect_database(dbinfo[0].subconninfo, false);
1323 if (conn != NULL)
1324 {
1325 /* Get failover replication slot names */
1326 res = PQexec(conn,
1327 "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1328
1329 if (PQresultStatus(res) == PGRES_TUPLES_OK)
1330 {
1331 /* Remove failover replication slots from subscriber */
1332 for (int i = 0; i < PQntuples(res); i++)
1333 drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0));
1334 }
1335 else
1336 {
1337 pg_log_warning("could not obtain failover replication slot information: %s",
1339 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1340 }
1341
1342 PQclear(res);
1343 disconnect_database(conn, false);
1344 }
1345 else
1346 {
1347 pg_log_warning("could not drop failover replication slot");
1348 pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1349 }
1350}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), i, pg_log_warning, pg_log_warning_hint, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), and PQresultStatus().

Referenced by main().

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char *  slotname 
)
static

Definition at line 1287 of file pg_createsubscriber.c.

1288{
1289 PGconn *conn;
1290
1291 /* Replication slot does not exist, do nothing */
1292 if (!primary_slot_name)
1293 return;
1294
1295 conn = connect_database(dbinfo[0].pubconninfo, false);
1296 if (conn != NULL)
1297 {
1298 drop_replication_slot(conn, &dbinfo[0], slotname);
1299 disconnect_database(conn, false);
1300 }
1301 else
1302 {
1303 pg_log_warning("could not drop replication slot \"%s\" on primary",
1304 slotname);
1305 pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1306 }
1307}

References conn, connect_database(), disconnect_database(), drop_replication_slot(), pg_log_warning, pg_log_warning_hint, and primary_slot_name.

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
const char *  pubname,
const char *  dbname,
bool *  made_publication 
)
static

Definition at line 1677 of file pg_createsubscriber.c.

1679{
1681 PGresult *res;
1682 char *pubname_esc;
1683
1684 Assert(conn != NULL);
1685
1686 pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
1687
1688 pg_log_info("dropping publication \"%s\" in database \"%s\"",
1689 pubname, dbname);
1690
1691 appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1692
1693 PQfreemem(pubname_esc);
1694
1695 pg_log_debug("command is: %s", str->data);
1696
1697 if (!dry_run)
1698 {
1699 res = PQexec(conn, str->data);
1700 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1701 {
1702 pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1703 pubname, dbname, PQresultErrorMessage(res));
1704 *made_publication = false; /* don't try again. */
1705
1706 /*
1707 * Don't disconnect and exit here. This routine is used by primary
1708 * (cleanup publication / replication slot due to an error) and
1709 * subscriber (remove the replicated publications). In both cases,
1710 * it can continue and provide instructions for the user to remove
1711 * it later if cleanup fails.
1712 */
1713 }
1714 PQclear(res);
1715 }
1716
1718}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), dbname, destroyPQExpBuffer(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), and str.

Referenced by check_and_drop_publications(), and cleanup_objects_atexit().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char *  slot_name 
)
static

Definition at line 1409 of file pg_createsubscriber.c.

1411{
1413 char *slot_name_esc;
1414 PGresult *res;
1415
1416 Assert(conn != NULL);
1417
1418 pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1419 slot_name, dbinfo->dbname);
1420
1421 slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1422
1423 appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1424
1425 PQfreemem(slot_name_esc);
1426
1427 pg_log_debug("command is: %s", str->data);
1428
1429 if (!dry_run)
1430 {
1431 res = PQexec(conn, str->data);
1432 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1433 {
1434 pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1435 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1436 dbinfo->made_replslot = false; /* don't try again. */
1437 }
1438
1439 PQclear(res);
1440 }
1441
1443}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1931 of file pg_createsubscriber.c.

1932{
1934 PGresult *res;
1935 char *subname;
1936
1937 Assert(conn != NULL);
1938
1939 subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1940
1941 pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1942 dbinfo->subname, dbinfo->dbname);
1943
1944 appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1945
1946 pg_log_debug("command is: %s", str->data);
1947
1948 if (!dry_run)
1949 {
1950 res = PQexec(conn, str->data);
1951 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1952 {
1953 pg_log_error("could not enable subscription \"%s\": %s",
1954 dbinfo->subname, PQresultErrorMessage(res));
1956 }
1957
1958 PQclear(res);
1959 }
1960
1963}

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQfreemem(), PQresultErrorMessage(), PQresultStatus(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 714 of file pg_createsubscriber.c.

715{
716 PGresult *res;
717 Oid oid;
718 uint32 rand;
719 char *objname;
720
721 res = PQexec(conn,
722 "SELECT oid FROM pg_catalog.pg_database "
723 "WHERE datname = pg_catalog.current_database()");
725 {
726 pg_log_error("could not obtain database OID: %s",
729 }
730
731 if (PQntuples(res) != 1)
732 {
733 pg_log_error("could not obtain database OID: got %d rows, expected %d row",
734 PQntuples(res), 1);
736 }
737
738 /* Database OID */
739 oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
740
741 PQclear(res);
742
743 /* Random unsigned integer */
744 rand = pg_prng_uint32(&prng_state);
745
746 /*
747 * Build the object name. The name must not exceed NAMEDATALEN - 1. This
748 * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
749 * '\0').
750 */
751 objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
752
753 return objname;
754}
uint32_t uint32
Definition: c.h:502
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
unsigned int Oid
Definition: postgres_ext.h:30
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43

References conn, disconnect_database(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), prng_state, and psprintf().

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char *  conninfo,
char **  dbname 
)
static

Definition at line 301 of file pg_createsubscriber.c.

302{
304 PQconninfoOption *conn_opts;
305 PQconninfoOption *conn_opt;
306 char *errmsg = NULL;
307 char *ret;
308
309 conn_opts = PQconninfoParse(conninfo, &errmsg);
310 if (conn_opts == NULL)
311 {
312 pg_log_error("could not parse connection string: %s", errmsg);
314 return NULL;
315 }
316
318 for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
319 {
320 if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
321 {
322 if (strcmp(conn_opt->keyword, "dbname") == 0)
323 {
324 if (dbname)
325 *dbname = pg_strdup(conn_opt->val);
326 continue;
327 }
328 appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
329 }
330 }
331
332 ret = pg_strdup(buf->data);
333
335 PQconninfoFree(conn_opts);
336
337 return ret;
338}
int errmsg(const char *fmt,...)
Definition: elog.c:1071
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7434
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:6150

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg(), _PQconninfoOption::keyword, pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and _PQconninfoOption::val.

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char *  argv0,
const char *  progname 
)
static

Definition at line 371 of file pg_createsubscriber.c.

372{
373 char *versionstr;
374 char *exec_path;
375 int ret;
376
377 versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
379 ret = find_other_exec(argv0, progname, versionstr, exec_path);
380
381 if (ret < 0)
382 {
383 char full_path[MAXPGPATH];
384
385 if (find_my_exec(argv0, full_path) < 0)
386 strlcpy(full_path, progname, sizeof(full_path));
387
388 if (ret == -1)
389 pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
390 progname, "pg_createsubscriber", full_path);
391 else
392 pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
393 progname, full_path, "pg_createsubscriber");
394 }
395
396 pg_log_debug("%s path is: %s", progname, exec_path);
397
398 return exec_path;
399}
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:160
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:310
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static const char * progname
static char * argv0
Definition: pg_ctl.c:93
static char * exec_path
Definition: pg_ctl.c:88
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45

References argv0, exec_path, find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char *  conninfo)
static

Definition at line 591 of file pg_createsubscriber.c.

592{
593 PGconn *conn;
594 PGresult *res;
595 uint64 sysid;
596
597 pg_log_info("getting system identifier from publisher");
598
599 conn = connect_database(conninfo, true);
600
601 res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
603 {
604 pg_log_error("could not get system identifier: %s",
607 }
608 if (PQntuples(res) != 1)
609 {
610 pg_log_error("could not get system identifier: got %d rows, expected %d row",
611 PQntuples(res), 1);
613 }
614
615 sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
616
617 pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
618
619 PQclear(res);
621
622 return sysid;
623}
uint64_t uint64
Definition: c.h:503

References conn, connect_database(), disconnect_database(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), and PQresultStatus().

Referenced by main().

◆ get_publisher_databases()

static void get_publisher_databases ( struct CreateSubscriberOptions opt,
bool  dbnamespecified 
)
static

Definition at line 1971 of file pg_createsubscriber.c.

1973{
1974 PGconn *conn;
1975 PGresult *res;
1976
1977 /* If a database name was specified, just connect to it. */
1978 if (dbnamespecified)
1980 else
1981 {
1982 /* Otherwise, try postgres first and then template1. */
1983 char *conninfo;
1984
1985 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "postgres");
1986 conn = connect_database(conninfo, false);
1987 pg_free(conninfo);
1988 if (!conn)
1989 {
1990 conninfo = concat_conninfo_dbname(opt->pub_conninfo_str, "template1");
1991 conn = connect_database(conninfo, true);
1992 pg_free(conninfo);
1993 }
1994 }
1995
1996 res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
1997 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1998 {
1999 pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
2000 PQclear(res);
2002 }
2003
2004 for (int i = 0; i < PQntuples(res); i++)
2005 {
2006 const char *dbname = PQgetvalue(res, i, 0);
2007
2009
2010 /* Increment num_dbs to reflect multiple --database options */
2011 num_dbs++;
2012 }
2013
2014 PQclear(res);
2015 disconnect_database(conn, false);
2016}
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
SimpleStringList database_names

References concat_conninfo_dbname(), conn, connect_database(), CreateSubscriberOptions::database_names, dbname, disconnect_database(), i, num_dbs, pg_free(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), CreateSubscriberOptions::pub_conninfo_str, and simple_string_list_append().

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char *  datadir)
static

Definition at line 631 of file pg_createsubscriber.c.

632{
633 ControlFileData *cf;
634 bool crc_ok;
635 uint64 sysid;
636
637 pg_log_info("getting system identifier from subscriber");
638
639 cf = get_controlfile(datadir, &crc_ok);
640 if (!crc_ok)
641 pg_fatal("control file appears to be corrupt");
642
643 sysid = cf->system_identifier;
644
645 pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
646
647 pg_free(cf);
648
649 return sysid;
650}
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
uint64 system_identifier
Definition: pg_control.h:110

References datadir, get_controlfile(), pg_fatal, pg_free(), pg_log_info, and ControlFileData::system_identifier.

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 345 of file pg_createsubscriber.c.

346{
348 char *ret;
349
350 appendConnStrItem(buf, "port", opt->sub_port);
351#if !defined(WIN32)
352 appendConnStrItem(buf, "host", opt->socket_dir);
353#endif
354 if (opt->sub_username != NULL)
355 appendConnStrItem(buf, "user", opt->sub_username);
356 appendConnStrItem(buf, "fallback_application_name", progname);
357
358 ret = pg_strdup(buf->data);
359
361
362 return ret;
363}

References appendConnStrItem(), buf, createPQExpBuffer(), destroyPQExpBuffer(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 2019 of file pg_createsubscriber.c.

2020{
2021 static struct option long_options[] =
2022 {
2023 {"all", no_argument, NULL, 'a'},
2024 {"database", required_argument, NULL, 'd'},
2025 {"pgdata", required_argument, NULL, 'D'},
2026 {"dry-run", no_argument, NULL, 'n'},
2027 {"subscriber-port", required_argument, NULL, 'p'},
2028 {"publisher-server", required_argument, NULL, 'P'},
2029 {"remove", required_argument, NULL, 'R'},
2030 {"socketdir", required_argument, NULL, 's'},
2031 {"recovery-timeout", required_argument, NULL, 't'},
2032 {"enable-two-phase", no_argument, NULL, 'T'},
2033 {"subscriber-username", required_argument, NULL, 'U'},
2034 {"verbose", no_argument, NULL, 'v'},
2035 {"version", no_argument, NULL, 'V'},
2036 {"help", no_argument, NULL, '?'},
2037 {"config-file", required_argument, NULL, 1},
2038 {"publication", required_argument, NULL, 2},
2039 {"replication-slot", required_argument, NULL, 3},
2040 {"subscription", required_argument, NULL, 4},
2041 {NULL, 0, NULL, 0}
2042 };
2043
2044 struct CreateSubscriberOptions opt = {0};
2045
2046 int c;
2047 int option_index;
2048
2049 char *pub_base_conninfo;
2050 char *sub_base_conninfo;
2051 char *dbname_conninfo = NULL;
2052
2053 uint64 pub_sysid;
2054 uint64 sub_sysid;
2055 struct stat statbuf;
2056
2057 char *consistent_lsn;
2058
2059 char pidfile[MAXPGPATH];
2060
2061 pg_logging_init(argv[0]);
2063 progname = get_progname(argv[0]);
2064 set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
2065
2066 if (argc > 1)
2067 {
2068 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
2069 {
2070 usage();
2071 exit(0);
2072 }
2073 else if (strcmp(argv[1], "-V") == 0
2074 || strcmp(argv[1], "--version") == 0)
2075 {
2076 puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
2077 exit(0);
2078 }
2079 }
2080
2081 /* Default settings */
2082 subscriber_dir = NULL;
2083 opt.config_file = NULL;
2084 opt.pub_conninfo_str = NULL;
2085 opt.socket_dir = NULL;
2087 opt.sub_username = NULL;
2088 opt.two_phase = false;
2090 {
2091 0
2092 };
2093 opt.recovery_timeout = 0;
2094 opt.all_dbs = false;
2095
2096 /*
2097 * Don't allow it to be run as root. It uses pg_ctl which does not allow
2098 * it either.
2099 */
2100#ifndef WIN32
2101 if (geteuid() == 0)
2102 {
2103 pg_log_error("cannot be executed by \"root\"");
2104 pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
2105 progname);
2106 exit(1);
2107 }
2108#endif
2109
2111
2112 while ((c = getopt_long(argc, argv, "ad:D:np:P:R:s:t:TU:v",
2113 long_options, &option_index)) != -1)
2114 {
2115 switch (c)
2116 {
2117 case 'a':
2118 opt.all_dbs = true;
2119 break;
2120 case 'd':
2122 {
2124 num_dbs++;
2125 }
2126 else
2127 pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
2128 break;
2129 case 'D':
2132 break;
2133 case 'n':
2134 dry_run = true;
2135 break;
2136 case 'p':
2137 opt.sub_port = pg_strdup(optarg);
2138 break;
2139 case 'P':
2141 break;
2142 case 'R':
2145 else
2146 pg_fatal("object type \"%s\" is specified more than once for -R/--remove", optarg);
2147 break;
2148 case 's':
2151 break;
2152 case 't':
2153 opt.recovery_timeout = atoi(optarg);
2154 break;
2155 case 'T':
2156 opt.two_phase = true;
2157 break;
2158 case 'U':
2160 break;
2161 case 'v':
2163 break;
2164 case 1:
2166 break;
2167 case 2:
2169 {
2171 num_pubs++;
2172 }
2173 else
2174 pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
2175 break;
2176 case 3:
2178 {
2180 num_replslots++;
2181 }
2182 else
2183 pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
2184 break;
2185 case 4:
2187 {
2189 num_subs++;
2190 }
2191 else
2192 pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
2193 break;
2194 default:
2195 /* getopt_long already emitted a complaint */
2196 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2197 exit(1);
2198 }
2199 }
2200
2201 /* Validate that --all is not used with incompatible options */
2202 if (opt.all_dbs)
2203 {
2204 char *bad_switch = NULL;
2205
2206 if (num_dbs > 0)
2207 bad_switch = "--database";
2208 else if (num_pubs > 0)
2209 bad_switch = "--publication";
2210 else if (num_replslots > 0)
2211 bad_switch = "--replication-slot";
2212 else if (num_subs > 0)
2213 bad_switch = "--subscription";
2214
2215 if (bad_switch)
2216 {
2217 pg_log_error("%s cannot be used with -a/--all", bad_switch);
2218 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2219 exit(1);
2220 }
2221 }
2222
2223 /* Any non-option arguments? */
2224 if (optind < argc)
2225 {
2226 pg_log_error("too many command-line arguments (first is \"%s\")",
2227 argv[optind]);
2228 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2229 exit(1);
2230 }
2231
2232 /* Required arguments */
2233 if (subscriber_dir == NULL)
2234 {
2235 pg_log_error("no subscriber data directory specified");
2236 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2237 exit(1);
2238 }
2239
2240 /* If socket directory is not provided, use the current directory */
2241 if (opt.socket_dir == NULL)
2242 {
2243 char cwd[MAXPGPATH];
2244
2245 if (!getcwd(cwd, MAXPGPATH))
2246 pg_fatal("could not determine current directory");
2247 opt.socket_dir = pg_strdup(cwd);
2249 }
2250
2251 /*
2252 * Parse connection string. Build a base connection string that might be
2253 * reused by multiple databases.
2254 */
2255 if (opt.pub_conninfo_str == NULL)
2256 {
2257 /*
2258 * TODO use primary_conninfo (if available) from subscriber and
2259 * extract publisher connection string. Assume that there are
2260 * identical entries for physical and logical replication. If there is
2261 * not, we would fail anyway.
2262 */
2263 pg_log_error("no publisher connection string specified");
2264 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2265 exit(1);
2266 }
2267 pg_log_info("validating publisher connection string");
2268 pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2269 &dbname_conninfo);
2270 if (pub_base_conninfo == NULL)
2271 exit(1);
2272
2273 pg_log_info("validating subscriber connection string");
2274 sub_base_conninfo = get_sub_conninfo(&opt);
2275
2276 /*
2277 * Fetch all databases from the source (publisher) and treat them as if
2278 * the user specified has multiple --database options, one for each source
2279 * database.
2280 */
2281 if (opt.all_dbs)
2282 {
2283 bool dbnamespecified = (dbname_conninfo != NULL);
2284
2285 get_publisher_databases(&opt, dbnamespecified);
2286 }
2287
2288 if (opt.database_names.head == NULL)
2289 {
2290 pg_log_info("no database was specified");
2291
2292 /*
2293 * Try to obtain the dbname from the publisher conninfo. If dbname
2294 * parameter is not available, error out.
2295 */
2296 if (dbname_conninfo)
2297 {
2298 simple_string_list_append(&opt.database_names, dbname_conninfo);
2299 num_dbs++;
2300
2301 pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2302 dbname_conninfo);
2303 }
2304 else
2305 {
2306 pg_log_error("no database name specified");
2307 pg_log_error_hint("Try \"%s --help\" for more information.",
2308 progname);
2309 exit(1);
2310 }
2311 }
2312
2313 /* Number of object names must match number of databases */
2314 if (num_pubs > 0 && num_pubs != num_dbs)
2315 {
2316 pg_log_error("wrong number of publication names specified");
2317 pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2318 num_pubs, num_dbs);
2319 exit(1);
2320 }
2321 if (num_subs > 0 && num_subs != num_dbs)
2322 {
2323 pg_log_error("wrong number of subscription names specified");
2324 pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2325 num_subs, num_dbs);
2326 exit(1);
2327 }
2328 if (num_replslots > 0 && num_replslots != num_dbs)
2329 {
2330 pg_log_error("wrong number of replication slot names specified");
2331 pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2333 exit(1);
2334 }
2335
2336 /* Verify the object types specified for removal from the subscriber */
2337 for (SimpleStringListCell *cell = opt.objecttypes_to_remove.head; cell; cell = cell->next)
2338 {
2339 if (pg_strcasecmp(cell->val, "publications") == 0)
2341 else
2342 {
2343 pg_log_error("invalid object type \"%s\" specified for -R/--remove", cell->val);
2344 pg_log_error_hint("The valid option is: \"publications\"");
2345 exit(1);
2346 }
2347 }
2348
2349 /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2350 pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2351 pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2352
2353 /* Rudimentary check for a data directory */
2355
2357
2358 /*
2359 * Store database information for publisher and subscriber. It should be
2360 * called before atexit() because its return is used in the
2361 * cleanup_objects_atexit().
2362 */
2363 dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2364
2365 /* Register a function to clean up objects in case of failure */
2366 atexit(cleanup_objects_atexit);
2367
2368 /*
2369 * Check if the subscriber data directory has the same system identifier
2370 * than the publisher data directory.
2371 */
2373 sub_sysid = get_standby_sysid(subscriber_dir);
2374 if (pub_sysid != sub_sysid)
2375 pg_fatal("subscriber data directory is not a copy of the source database cluster");
2376
2377 /* Subscriber PID file */
2378 snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2379
2380 /*
2381 * The standby server must not be running. If the server is started under
2382 * service manager and pg_createsubscriber stops it, the service manager
2383 * might react to this action and start the server again. Therefore,
2384 * refuse to proceed if the server is running to avoid possible failures.
2385 */
2386 if (stat(pidfile, &statbuf) == 0)
2387 {
2388 pg_log_error("standby server is running");
2389 pg_log_error_hint("Stop the standby server and try again.");
2390 exit(1);
2391 }
2392
2393 /*
2394 * Start a short-lived standby server with temporary parameters (provided
2395 * by command-line options). The goal is to avoid connections during the
2396 * transformation steps.
2397 */
2398 pg_log_info("starting the standby server with command-line options");
2399 start_standby_server(&opt, true, false);
2400
2401 /* Check if the standby server is ready for logical replication */
2403
2404 /* Check if the primary server is ready for logical replication */
2406
2407 /*
2408 * Stop the target server. The recovery process requires that the server
2409 * reaches a consistent state before targeting the recovery stop point.
2410 * Make sure a consistent state is reached (stop the target server
2411 * guarantees it) *before* creating the replication slots in
2412 * setup_publisher().
2413 */
2414 pg_log_info("stopping the subscriber");
2416
2417 /* Create the required objects for each database on publisher */
2418 consistent_lsn = setup_publisher(dbinfos.dbinfo);
2419
2420 /* Write the required recovery parameters */
2421 setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn);
2422
2423 /*
2424 * Start subscriber so the recovery parameters will take effect. Wait
2425 * until accepting connections. We don't want to start logical replication
2426 * during setup.
2427 */
2428 pg_log_info("starting the subscriber");
2429 start_standby_server(&opt, true, true);
2430
2431 /* Waiting the subscriber to be promoted */
2433
2434 /*
2435 * Create the subscription for each database on subscriber. It does not
2436 * enable it immediately because it needs to adjust the replication start
2437 * point to the LSN reported by setup_publisher(). It also cleans up
2438 * publications created by this tool and replication to the standby.
2439 */
2440 setup_subscriber(dbinfos.dbinfo, consistent_lsn);
2441
2442 /* Remove primary_slot_name if it exists on primary */
2444
2445 /* Remove failover replication slots if they exist on subscriber */
2447
2448 /* Stop the subscriber */
2449 pg_log_info("stopping the subscriber");
2451
2452 /* Change system identifier from subscriber */
2454
2455 success = true;
2456
2457 pg_log_info("Done!");
2458
2459 return 0;
2460}
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1185
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:25
#define required_argument
Definition: getopt_long.h:26
void pg_logging_increase_verbosity(void)
Definition: logging.c:185
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:176
@ PG_LOG_WARNING
Definition: logging.h:38
#define pg_log_error_detail(...)
Definition: logging.h:109
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static char * pg_ctl_path
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static void check_data_directory(const char *datadir)
static void usage()
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified)
static char * pg_resetwal_path
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition: getopt.c:51
PGDLLIMPORT char * optarg
Definition: getopt.c:53
int pg_strcasecmp(const char *s1, const char *s2)
Definition: pgstrcasecmp.c:36
void canonicalize_path(char *path)
Definition: path.c:337
const char * get_progname(const char *argv0)
Definition: path.c:652
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
struct SimpleStringList SimpleStringList
SimpleStringList objecttypes_to_remove
SimpleStringList replslot_names
struct SimpleStringListCell * next
Definition: simple_list.h:34
SimpleStringListCell * head
Definition: simple_list.h:42

References CreateSubscriberOptions::all_dbs, canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, LogicalRepInfos::dbinfo, dbinfos, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_publisher_databases(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), getopt_long(), SimpleStringList::head, MAXPGPATH, modify_subscriber_sysid(), SimpleStringListCell::next, no_argument, num_dbs, num_pubs, num_replslots, num_subs, OBJECTTYPE_PUBLICATIONS, CreateSubscriberOptions::objecttypes_to_remove, LogicalRepInfos::objecttypes_to_remove, optarg, optind, pg_ctl_path, pg_fatal, pg_log_error, pg_log_error_detail, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_resetwal_path, pg_strcasecmp(), pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, LogicalRepInfo::subconninfo, subscriber_dir, success, CreateSubscriberOptions::two_phase, LogicalRepInfos::two_phase, usage(), and wait_for_end_recovery().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 658 of file pg_createsubscriber.c.

659{
660 ControlFileData *cf;
661 bool crc_ok;
662 struct timeval tv;
663
664 char *cmd_str;
665
666 pg_log_info("modifying system identifier of subscriber");
667
668 cf = get_controlfile(subscriber_dir, &crc_ok);
669 if (!crc_ok)
670 pg_fatal("control file appears to be corrupt");
671
672 /*
673 * Select a new system identifier.
674 *
675 * XXX this code was extracted from BootStrapXLOG().
676 */
677 gettimeofday(&tv, NULL);
678 cf->system_identifier = ((uint64) tv.tv_sec) << 32;
679 cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
680 cf->system_identifier |= getpid() & 0xFFF;
681
682 if (!dry_run)
684
685 pg_log_info("system identifier is %" PRIu64 " on subscriber",
687
688 pg_log_info("running pg_resetwal on the subscriber");
689
690 cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
692
693 pg_log_debug("pg_resetwal command is: %s", cmd_str);
694
695 if (!dry_run)
696 {
697 int rc = system(cmd_str);
698
699 if (rc == 0)
700 pg_log_info("subscriber successfully changed the system identifier");
701 else
702 pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
703 }
704
705 pg_free(cf);
706}
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define DEVNULL
Definition: port.h:161
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
int gettimeofday(struct timeval *tp, void *tzp)

References DEVNULL, dry_run, get_controlfile(), gettimeofday(), pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), subscriber_dir, ControlFileData::system_identifier, update_controlfile(), and wait_result_to_str().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char *  pg_ctl_cmd,
int  rc 
)
static

Definition at line 1449 of file pg_createsubscriber.c.

1450{
1451 if (rc != 0)
1452 {
1453 if (WIFEXITED(rc))
1454 {
1455 pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1456 }
1457 else if (WIFSIGNALED(rc))
1458 {
1459#if defined(WIN32)
1460 pg_log_error("pg_ctl was terminated by exception 0x%X",
1461 WTERMSIG(rc));
1462 pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1463#else
1464 pg_log_error("pg_ctl was terminated by signal %d: %s",
1465 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1466#endif
1467 }
1468 else
1469 {
1470 pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1471 }
1472
1473 pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1474 exit(1);
1475 }
1476}
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
#define WIFEXITED(w)
Definition: win32_port.h:150
#define WIFSIGNALED(w)
Definition: win32_port.h:151
#define WTERMSIG(w)
Definition: win32_port.h:153
#define WEXITSTATUS(w)
Definition: win32_port.h:152

References pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 842 of file pg_createsubscriber.c.

843{
844 PGresult *res;
845 int ret;
846
847 res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
848
850 {
851 pg_log_error("could not obtain recovery progress: %s",
854 }
855
856
857 ret = strcmp("t", PQgetvalue(res, 0, 0));
858
859 PQclear(res);
860
861 return ret == 0;
862}

References conn, disconnect_database(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), and PQresultStatus().

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char *  lsn 
)
static

Definition at line 1840 of file pg_createsubscriber.c.

1841{
1843 PGresult *res;
1844 Oid suboid;
1845 char *subname;
1846 char *dbname;
1847 char *originname;
1848 char *lsnstr;
1849
1850 Assert(conn != NULL);
1851
1852 subname = PQescapeLiteral(conn, dbinfo->subname, strlen(dbinfo->subname));
1853 dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
1854
1856 "SELECT s.oid FROM pg_catalog.pg_subscription s "
1857 "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1858 "WHERE s.subname = %s AND d.datname = %s",
1859 subname, dbname);
1860
1861 res = PQexec(conn, str->data);
1862 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1863 {
1864 pg_log_error("could not obtain subscription OID: %s",
1867 }
1868
1869 if (PQntuples(res) != 1 && !dry_run)
1870 {
1871 pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1872 PQntuples(res), 1);
1874 }
1875
1876 if (dry_run)
1877 {
1878 suboid = InvalidOid;
1880 }
1881 else
1882 {
1883 suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1884 lsnstr = psprintf("%s", lsn);
1885 }
1886
1887 PQclear(res);
1888
1889 /*
1890 * The origin name is defined as pg_%u. %u is the subscription OID. See
1891 * ApplyWorkerMain().
1892 */
1893 originname = psprintf("pg_%u", suboid);
1894
1895 pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1896 originname, lsnstr, dbinfo->dbname);
1897
1900 "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1901 originname, lsnstr);
1902
1903 pg_log_debug("command is: %s", str->data);
1904
1905 if (!dry_run)
1906 {
1907 res = PQexec(conn, str->data);
1908 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1909 {
1910 pg_log_error("could not set replication progress for subscription \"%s\": %s",
1911 dbinfo->subname, PQresultErrorMessage(res));
1913 }
1914 PQclear(res);
1915 }
1916
1919 pg_free(originname);
1920 pg_free(lsnstr);
1922}
#define InvalidOid
Definition: postgres_ext.h:35
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References appendPQExpBuffer(), Assert(), conn, createPQExpBuffer(), LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQfreemem(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), psprintf(), resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 763 of file pg_createsubscriber.c.

764{
765 char *lsn = NULL;
766
767 pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
768
769 for (int i = 0; i < num_dbs; i++)
770 {
771 PGconn *conn;
772 char *genname = NULL;
773
774 conn = connect_database(dbinfo[i].pubconninfo, true);
775
776 /*
777 * If an object name was not specified as command-line options, assign
778 * a generated object name. The replication slot has a different rule.
779 * The subscription name is assigned to the replication slot name if
780 * no replication slot is specified. It follows the same rule as
781 * CREATE SUBSCRIPTION.
782 */
783 if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
784 genname = generate_object_name(conn);
785 if (num_pubs == 0)
786 dbinfo[i].pubname = pg_strdup(genname);
787 if (num_subs == 0)
788 dbinfo[i].subname = pg_strdup(genname);
789 if (num_replslots == 0)
790 dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
791
792 /*
793 * Create publication on publisher. This step should be executed
794 * *before* promoting the subscriber to avoid any transactions between
795 * consistent LSN and the new publication rows (such transactions
796 * wouldn't see the new publication rows resulting in an error).
797 */
798 create_publication(conn, &dbinfo[i]);
799
800 /* Create replication slot on publisher */
801 if (lsn)
802 pg_free(lsn);
803 lsn = create_logical_replication_slot(conn, &dbinfo[i]);
804 if (lsn != NULL || dry_run)
805 pg_log_info("create replication slot \"%s\" on publisher",
806 dbinfo[i].replslotname);
807 else
808 exit(1);
809
810 /*
811 * Since we are using the LSN returned by the last replication slot as
812 * recovery_target_lsn, this LSN is ahead of the current WAL position
813 * and the recovery waits until the publisher writes a WAL record to
814 * reach the target and ends the recovery. On idle systems, this wait
815 * time is unpredictable and could lead to failure in promoting the
816 * subscriber. To avoid that, insert a harmless WAL record.
817 */
818 if (i == num_dbs - 1 && !dry_run)
819 {
820 PGresult *res;
821
822 res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
824 {
825 pg_log_error("could not write an additional WAL record: %s",
828 }
829 PQclear(res);
830 }
831
833 }
834
835 return lsn;
836}
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), disconnect_database(), dry_run, generate_object_name(), i, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_error, pg_log_info, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQresultErrorMessage(), PQresultStatus(), prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char *  datadir,
const char *  lsn 
)
static

Definition at line 1227 of file pg_createsubscriber.c.

1228{
1229 PGconn *conn;
1231
1232 /*
1233 * Despite of the recovery parameters will be written to the subscriber,
1234 * use a publisher connection. The primary_conninfo is generated using the
1235 * connection settings.
1236 */
1237 conn = connect_database(dbinfo[0].pubconninfo, true);
1238
1239 /*
1240 * Write recovery parameters.
1241 *
1242 * The subscriber is not running yet. In dry run mode, the recovery
1243 * parameters *won't* be written. An invalid LSN is used for printing
1244 * purposes. Additional recovery parameters are added here. It avoids
1245 * unexpected behavior such as end of recovery as soon as a consistent
1246 * state is reached (recovery_target) and failure due to multiple recovery
1247 * targets (name, time, xid, LSN).
1248 */
1250 appendPQExpBufferStr(recoveryconfcontents, "recovery_target = ''\n");
1252 "recovery_target_timeline = 'latest'\n");
1254 "recovery_target_inclusive = true\n");
1256 "recovery_target_action = promote\n");
1257 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_name = ''\n");
1258 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_time = ''\n");
1259 appendPQExpBufferStr(recoveryconfcontents, "recovery_target_xid = ''\n");
1260
1261 if (dry_run)
1262 {
1263 appendPQExpBufferStr(recoveryconfcontents, "# dry run mode");
1265 "recovery_target_lsn = '%X/%X'\n",
1267 }
1268 else
1269 {
1270 appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1271 lsn);
1273 }
1274 disconnect_database(conn, false);
1275
1276 pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1277}
static PQExpBuffer recoveryconfcontents
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:125
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:28

References appendPQExpBuffer(), appendPQExpBufferStr(), conn, connect_database(), PQExpBufferData::data, datadir, disconnect_database(), dry_run, GenerateRecoveryConfig(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_log_debug, recoveryconfcontents, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char *  consistent_lsn 
)
static

Definition at line 1191 of file pg_createsubscriber.c.

1192{
1193 for (int i = 0; i < num_dbs; i++)
1194 {
1195 PGconn *conn;
1196
1197 /* Connect to subscriber. */
1198 conn = connect_database(dbinfo[i].subconninfo, true);
1199
1200 /*
1201 * We don't need the pre-existing subscriptions on the newly formed
1202 * subscriber. They can connect to other publisher nodes and either
1203 * get some unwarranted data or can lead to ERRORs in connecting to
1204 * such nodes.
1205 */
1207
1208 /* Check and drop the required publications in the given database. */
1210
1211 create_subscription(conn, &dbinfo[i]);
1212
1213 /* Set the replication progress to the correct LSN */
1214 set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1215
1216 /* Enable subscription */
1217 enable_subscription(conn, &dbinfo[i]);
1218
1219 disconnect_database(conn, false);
1220 }
1221}
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), check_and_drop_publications(), conn, connect_database(), create_subscription(), disconnect_database(), enable_subscription(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access,
bool  restrict_logical_worker 
)
static

Definition at line 1479 of file pg_createsubscriber.c.

1481{
1482 PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1483 int rc;
1484
1485 appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1486 appendShellString(pg_ctl_cmd, subscriber_dir);
1487 appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1488
1489 /* Prevent unintended slot invalidation */
1490 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
1491
1492 if (restricted_access)
1493 {
1494 appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1495#if !defined(WIN32)
1496
1497 /*
1498 * An empty listen_addresses list means the server does not listen on
1499 * any IP interfaces; only Unix-domain sockets can be used to connect
1500 * to the server. Prevent external connections to minimize the chance
1501 * of failure.
1502 */
1503 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1504 if (opt->socket_dir)
1505 appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1506 opt->socket_dir);
1507 appendPQExpBufferChar(pg_ctl_cmd, '"');
1508#endif
1509 }
1510 if (opt->config_file != NULL)
1511 appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1512 opt->config_file);
1513
1514 /* Suppress to start logical replication if requested */
1515 if (restrict_logical_worker)
1516 appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1517
1518 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1519 rc = system(pg_ctl_cmd->data);
1520 pg_ctl_status(pg_ctl_cmd->data, rc);
1521 standby_running = true;
1522 destroyPQExpBuffer(pg_ctl_cmd);
1523 pg_log_info("server was started");
1524}
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:582

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char *  datadir)
static

Definition at line 1527 of file pg_createsubscriber.c.

1528{
1529 char *pg_ctl_cmd;
1530 int rc;
1531
1532 pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1533 datadir);
1534 pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1535 rc = system(pg_ctl_cmd);
1536 pg_ctl_status(pg_ctl_cmd, rc);
1537 standby_running = false;
1538 pg_log_info("server was stopped");
1539}

References datadir, pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, psprintf(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char *  pub_base_conninfo,
const char *  sub_base_conninfo 
)
static

Definition at line 463 of file pg_createsubscriber.c.

466{
467 struct LogicalRepInfo *dbinfo;
468 SimpleStringListCell *pubcell = NULL;
469 SimpleStringListCell *subcell = NULL;
470 SimpleStringListCell *replslotcell = NULL;
471 int i = 0;
472
473 dbinfo = pg_malloc_array(struct LogicalRepInfo, num_dbs);
474
475 if (num_pubs > 0)
476 pubcell = opt->pub_names.head;
477 if (num_subs > 0)
478 subcell = opt->sub_names.head;
479 if (num_replslots > 0)
480 replslotcell = opt->replslot_names.head;
481
482 for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
483 {
484 char *conninfo;
485
486 /* Fill publisher attributes */
487 conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
488 dbinfo[i].pubconninfo = conninfo;
489 dbinfo[i].dbname = cell->val;
490 if (num_pubs > 0)
491 dbinfo[i].pubname = pubcell->val;
492 else
493 dbinfo[i].pubname = NULL;
494 if (num_replslots > 0)
495 dbinfo[i].replslotname = replslotcell->val;
496 else
497 dbinfo[i].replslotname = NULL;
498 dbinfo[i].made_replslot = false;
499 dbinfo[i].made_publication = false;
500 /* Fill subscriber attributes */
501 conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
502 dbinfo[i].subconninfo = conninfo;
503 if (num_subs > 0)
504 dbinfo[i].subname = subcell->val;
505 else
506 dbinfo[i].subname = NULL;
507 /* Other fields will be filled later */
508
509 pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
510 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
511 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
512 dbinfo[i].pubconninfo);
513 pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
514 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
515 dbinfo[i].subconninfo,
516 dbinfos.two_phase ? "true" : "false");
517
518 if (num_pubs > 0)
519 pubcell = pubcell->next;
520 if (num_subs > 0)
521 subcell = subcell->next;
522 if (num_replslots > 0)
523 replslotcell = replslotcell->next;
524
525 i++;
526 }
527
528 return dbinfo;
529}
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:56
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfos, LogicalRepInfo::dbname, SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, SimpleStringListCell::next, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, LogicalRepInfos::two_phase, and SimpleStringListCell::val.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 242 of file pg_createsubscriber.c.

243{
244 printf(_("%s creates a new logical replica from a standby server.\n\n"),
245 progname);
246 printf(_("Usage:\n"));
247 printf(_(" %s [OPTION]...\n"), progname);
248 printf(_("\nOptions:\n"));
249 printf(_(" -a, --all create subscriptions for all databases except template\n"
250 " databases or databases that don't allow connections\n"));
251 printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
252 printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
253 printf(_(" -n, --dry-run dry run, just show what would be done\n"));
254 printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
255 printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
256 printf(_(" -R, --remove=OBJECTTYPE remove all objects of the specified type from specified\n"
257 " databases on the subscriber; accepts: publications\n"));
258 printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
259 printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
260 printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n"));
261 printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
262 printf(_(" -v, --verbose output verbose messages\n"));
263 printf(_(" --config-file=FILENAME use specified main server configuration\n"
264 " file when running target cluster\n"));
265 printf(_(" --publication=NAME publication name\n"));
266 printf(_(" --replication-slot=NAME replication slot name\n"));
267 printf(_(" --subscription=NAME subscription name\n"));
268 printf(_(" -V, --version output version information, then exit\n"));
269 printf(_(" -?, --help show this help, then exit\n"));
270 printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
271 printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
272}
#define _(x)
Definition: elog.c:91
#define printf(...)
Definition: port.h:245

References _, DEFAULT_SUB_PORT, printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char *  conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1551 of file pg_createsubscriber.c.

1552{
1553 PGconn *conn;
1554 int status = POSTMASTER_STILL_STARTING;
1555 int timer = 0;
1556
1557 pg_log_info("waiting for the target server to reach the consistent state");
1558
1559 conn = connect_database(conninfo, true);
1560
1561 for (;;)
1562 {
1563 bool in_recovery = server_is_in_recovery(conn);
1564
1565 /*
1566 * Does the recovery process finish? In dry run mode, there is no
1567 * recovery mode. Bail out as the recovery process has ended.
1568 */
1569 if (!in_recovery || dry_run)
1570 {
1571 status = POSTMASTER_READY;
1572 recovery_ended = true;
1573 break;
1574 }
1575
1576 /* Bail out after recovery_timeout seconds if this option is set */
1577 if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1578 {
1580 pg_log_error("recovery timed out");
1582 }
1583
1584 /* Keep waiting */
1586
1587 timer += WAIT_INTERVAL;
1588 }
1589
1590 disconnect_database(conn, false);
1591
1592 if (status == POSTMASTER_STILL_STARTING)
1593 pg_fatal("server did not end recovery");
1594
1595 pg_log_info("target server reached the consistent state");
1596 pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1597}
#define pg_log_info_hint(...)
Definition: logging.h:130
#define WAIT_INTERVAL
#define USEC_PER_SEC
void pg_usleep(long microsec)
Definition: signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), POSTMASTER_READY, POSTMASTER_STILL_STARTING, recovery_ended, CreateSubscriberOptions::recovery_timeout, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USEC_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfos

◆ dry_run

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 143 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 145 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 144 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 149 of file pg_createsubscriber.c.

Referenced by main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 150 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

Definition at line 136 of file pg_createsubscriber.c.

Referenced by check_subscriber(), drop_primary_replication_slot(), and main().

◆ prng_state

pg_prng_state prng_state
static

Definition at line 147 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 134 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 155 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

char* subscriber_dir = NULL
static

◆ success

bool success = false
static

Definition at line 139 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().