PostgreSQL Source Code  git master
pg_createsubscriber.c File Reference
#include "postgres_fe.h"
#include <sys/time.h>
#include <sys/wait.h>
#include "catalog/pg_authid_d.h"
#include "common/connect.h"
#include "common/controldata_utils.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/pg_prng.h"
#include "common/restricted_token.h"
#include "fe_utils/recovery_gen.h"
#include "fe_utils/simple_list.h"
#include "fe_utils/string_utils.h"
#include "getopt_long.h"
Include dependency graph for pg_createsubscriber.c:

Go to the source code of this file.

Data Structures

struct  CreateSubscriberOptions
 
struct  LogicalRepInfo
 

Macros

#define DEFAULT_SUB_PORT   "50432"
 
#define USEC_PER_SEC   1000000
 
#define WAIT_INTERVAL   1 /* 1 second */
 

Enumerations

enum  WaitPMResult {
  POSTMASTER_READY , POSTMASTER_STILL_STARTING , POSTMASTER_READY , POSTMASTER_STILL_STARTING ,
  POSTMASTER_SHUTDOWN_IN_RECOVERY , POSTMASTER_FAILED
}
 

Functions

static void cleanup_objects_atexit (void)
 
static void usage ()
 
static char * get_base_conninfo (const char *conninfo, char **dbname)
 
static char * get_sub_conninfo (const struct CreateSubscriberOptions *opt)
 
static char * get_exec_path (const char *argv0, const char *progname)
 
static void check_data_directory (const char *datadir)
 
static char * concat_conninfo_dbname (const char *conninfo, const char *dbname)
 
static struct LogicalRepInfostore_pub_sub_info (const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
 
static PGconnconnect_database (const char *conninfo, bool exit_on_error)
 
static void disconnect_database (PGconn *conn, bool exit_on_error)
 
static uint64 get_primary_sysid (const char *conninfo)
 
static uint64 get_standby_sysid (const char *datadir)
 
static void modify_subscriber_sysid (const struct CreateSubscriberOptions *opt)
 
static bool server_is_in_recovery (PGconn *conn)
 
static char * generate_object_name (PGconn *conn)
 
static void check_publisher (const struct LogicalRepInfo *dbinfo)
 
static char * setup_publisher (struct LogicalRepInfo *dbinfo)
 
static void check_subscriber (const struct LogicalRepInfo *dbinfo)
 
static void setup_subscriber (struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 
static void setup_recovery (const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
 
static void drop_primary_replication_slot (struct LogicalRepInfo *dbinfo, const char *slotname)
 
static void drop_failover_replication_slots (struct LogicalRepInfo *dbinfo)
 
static char * create_logical_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_replication_slot (PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
 
static void pg_ctl_status (const char *pg_ctl_cmd, int rc)
 
static void start_standby_server (const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
 
static void stop_standby_server (const char *datadir)
 
static void wait_for_end_recovery (const char *conninfo, const struct CreateSubscriberOptions *opt)
 
static void create_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void drop_publication (PGconn *conn, struct LogicalRepInfo *dbinfo)
 
static void create_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void set_replication_progress (PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
 
static void enable_subscription (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void check_and_drop_existing_subscriptions (PGconn *conn, const struct LogicalRepInfo *dbinfo)
 
static void drop_existing_subscriptions (PGconn *conn, const char *subname, const char *dbname)
 
static void appendConnStrItem (PQExpBuffer buf, const char *keyword, const char *val)
 
int main (int argc, char **argv)
 

Variables

static const char * progname
 
static char * primary_slot_name = NULL
 
static bool dry_run = false
 
static bool success = false
 
static struct LogicalRepInfodbinfo
 
static int num_dbs = 0
 
static int num_pubs = 0
 
static int num_subs = 0
 
static int num_replslots = 0
 
static pg_prng_state prng_state
 
static char * pg_ctl_path = NULL
 
static char * pg_resetwal_path = NULL
 
static char * subscriber_dir = NULL
 
static bool recovery_ended = false
 
static bool standby_running = false
 

Macro Definition Documentation

◆ DEFAULT_SUB_PORT

#define DEFAULT_SUB_PORT   "50432"

Definition at line 32 of file pg_createsubscriber.c.

◆ USEC_PER_SEC

#define USEC_PER_SEC   1000000

Definition at line 111 of file pg_createsubscriber.c.

◆ WAIT_INTERVAL

#define WAIT_INTERVAL   1 /* 1 second */

Definition at line 112 of file pg_createsubscriber.c.

Enumeration Type Documentation

◆ WaitPMResult

Enumerator
POSTMASTER_READY 
POSTMASTER_STILL_STARTING 
POSTMASTER_READY 
POSTMASTER_STILL_STARTING 
POSTMASTER_SHUTDOWN_IN_RECOVERY 
POSTMASTER_FAILED 

Definition at line 138 of file pg_createsubscriber.c.

139 {
142 };
@ POSTMASTER_READY
@ POSTMASTER_STILL_STARTING

Function Documentation

◆ appendConnStrItem()

static void appendConnStrItem ( PQExpBuffer  buf,
const char *  keyword,
const char *  val 
)
static

Definition at line 249 of file pg_createsubscriber.c.

250 {
251  if (buf->len > 0)
253  appendPQExpBufferStr(buf, keyword);
256 }
long val
Definition: informix.c:689
static char * buf
Definition: pg_test_fsync.c:73
void appendPQExpBufferChar(PQExpBuffer str, char ch)
Definition: pqexpbuffer.c:378
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:545

References appendConnStrVal(), appendPQExpBufferChar(), appendPQExpBufferStr(), buf, and val.

Referenced by concat_conninfo_dbname(), get_base_conninfo(), and get_sub_conninfo().

◆ check_and_drop_existing_subscriptions()

static void check_and_drop_existing_subscriptions ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1103 of file pg_createsubscriber.c.

1105 {
1106  PQExpBuffer query = createPQExpBuffer();
1107  char *dbname;
1108  PGresult *res;
1109 
1110  Assert(conn != NULL);
1111 
1113 
1114  appendPQExpBuffer(query,
1115  "SELECT s.subname FROM pg_catalog.pg_subscription s "
1116  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1117  "WHERE d.datname = %s",
1118  dbname);
1119  res = PQexec(conn, query->data);
1120 
1122  {
1123  pg_log_error("could not obtain pre-existing subscriptions: %s",
1125  disconnect_database(conn, true);
1126  }
1127 
1128  for (int i = 0; i < PQntuples(res); i++)
1130  dbinfo->dbname);
1131 
1132  PQclear(res);
1133  destroyPQExpBuffer(query);
1134 }
#define Assert(condition)
Definition: c.h:861
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQescapeLiteral(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4304
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int i
Definition: isn.c:73
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
#define pg_log_error(...)
Definition: logging.h:106
static struct LogicalRepInfo * dbinfo
static void drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
static void disconnect_database(PGconn *conn, bool exit_on_error)
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
char * dbname
Definition: streamutil.c:52
PGconn * conn
Definition: streamutil.c:55

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, dbinfo, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), drop_existing_subscriptions(), i, pg_log_error, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), and res.

Referenced by setup_subscriber().

◆ check_data_directory()

static void check_data_directory ( const char *  datadir)
static

Definition at line 377 of file pg_createsubscriber.c.

378 {
379  struct stat statbuf;
380  char versionfile[MAXPGPATH];
381 
382  pg_log_info("checking if directory \"%s\" is a cluster data directory",
383  datadir);
384 
385  if (stat(datadir, &statbuf) != 0)
386  {
387  if (errno == ENOENT)
388  pg_fatal("data directory \"%s\" does not exist", datadir);
389  else
390  pg_fatal("could not access directory \"%s\": %m", datadir);
391  }
392 
393  snprintf(versionfile, MAXPGPATH, "%s/PG_VERSION", datadir);
394  if (stat(versionfile, &statbuf) != 0 && errno == ENOENT)
395  {
396  pg_fatal("directory \"%s\" is not a database cluster directory",
397  datadir);
398  }
399 }
#define pg_log_info(...)
Definition: logging.h:124
#define pg_fatal(...)
#define MAXPGPATH
char * datadir
#define snprintf
Definition: port.h:238
#define stat
Definition: win32_port.h:284

References datadir, MAXPGPATH, pg_fatal, pg_log_info, snprintf, and stat.

Referenced by main().

◆ check_publisher()

static void check_publisher ( const struct LogicalRepInfo dbinfo)
static

Definition at line 841 of file pg_createsubscriber.c.

842 {
843  PGconn *conn;
844  PGresult *res;
845  bool failed = false;
846 
847  char *wal_level;
848  int max_repslots;
849  int cur_repslots;
850  int max_walsenders;
851  int cur_walsenders;
852  int max_prepared_transactions;
853 
854  pg_log_info("checking settings on publisher");
855 
856  conn = connect_database(dbinfo[0].pubconninfo, true);
857 
858  /*
859  * If the primary server is in recovery (i.e. cascading replication),
860  * objects (publication) cannot be created because it is read only.
861  */
863  {
864  pg_log_error("primary server cannot be in recovery");
865  disconnect_database(conn, true);
866  }
867 
868  /*------------------------------------------------------------------------
869  * Logical replication requires a few parameters to be set on publisher.
870  * Since these parameters are not a requirement for physical replication,
871  * we should check it to make sure it won't fail.
872  *
873  * - wal_level = logical
874  * - max_replication_slots >= current + number of dbs to be converted
875  * - max_wal_senders >= current + number of dbs to be converted
876  * -----------------------------------------------------------------------
877  */
878  res = PQexec(conn,
879  "SELECT pg_catalog.current_setting('wal_level'),"
880  " pg_catalog.current_setting('max_replication_slots'),"
881  " (SELECT count(*) FROM pg_catalog.pg_replication_slots),"
882  " pg_catalog.current_setting('max_wal_senders'),"
883  " (SELECT count(*) FROM pg_catalog.pg_stat_activity WHERE backend_type = 'walsender'),"
884  " pg_catalog.current_setting('max_prepared_transactions')");
885 
887  {
888  pg_log_error("could not obtain publisher settings: %s",
890  disconnect_database(conn, true);
891  }
892 
893  wal_level = pg_strdup(PQgetvalue(res, 0, 0));
894  max_repslots = atoi(PQgetvalue(res, 0, 1));
895  cur_repslots = atoi(PQgetvalue(res, 0, 2));
896  max_walsenders = atoi(PQgetvalue(res, 0, 3));
897  cur_walsenders = atoi(PQgetvalue(res, 0, 4));
898  max_prepared_transactions = atoi(PQgetvalue(res, 0, 5));
899 
900  PQclear(res);
901 
902  pg_log_debug("publisher: wal_level: %s", wal_level);
903  pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
904  pg_log_debug("publisher: current replication slots: %d", cur_repslots);
905  pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
906  pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
907  pg_log_debug("publisher: max_prepared_transactions: %d",
908  max_prepared_transactions);
909 
910  disconnect_database(conn, false);
911 
912  if (strcmp(wal_level, "logical") != 0)
913  {
914  pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
915  failed = true;
916  }
917 
918  if (max_repslots - cur_repslots < num_dbs)
919  {
920  pg_log_error("publisher requires %d replication slots, but only %d remain",
921  num_dbs, max_repslots - cur_repslots);
922  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
923  "max_replication_slots", cur_repslots + num_dbs);
924  failed = true;
925  }
926 
927  if (max_walsenders - cur_walsenders < num_dbs)
928  {
929  pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
930  num_dbs, max_walsenders - cur_walsenders);
931  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
932  "max_wal_senders", cur_walsenders + num_dbs);
933  failed = true;
934  }
935 
936  if (max_prepared_transactions != 0)
937  {
938  pg_log_warning("two_phase option will not be enabled for replication slots");
939  pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
940  "Prepared transactions will be replicated at COMMIT PREPARED.");
941  }
942 
944 
945  if (failed)
946  exit(1);
947 }
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
exit(1)
#define pg_log_error_hint(...)
Definition: logging.h:112
#define pg_log_warning_detail(...)
Definition: logging.h:118
#define pg_log_debug(...)
Definition: logging.h:133
static bool server_is_in_recovery(PGconn *conn)
static int num_dbs
static PGconn * connect_database(const char *conninfo, bool exit_on_error)
#define pg_log_warning(...)
Definition: pgfnames.c:24
int wal_level
Definition: xlog.c:130

References conn, connect_database(), dbinfo, disconnect_database(), exit(), num_dbs, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_log_warning, pg_log_warning_detail, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), res, server_is_in_recovery(), and wal_level.

Referenced by main().

◆ check_subscriber()

static void check_subscriber ( const struct LogicalRepInfo dbinfo)
static

Definition at line 961 of file pg_createsubscriber.c.

962 {
963  PGconn *conn;
964  PGresult *res;
965  bool failed = false;
966 
967  int max_lrworkers;
968  int max_repslots;
969  int max_wprocs;
970 
971  pg_log_info("checking settings on subscriber");
972 
973  conn = connect_database(dbinfo[0].subconninfo, true);
974 
975  /* The target server must be a standby */
977  {
978  pg_log_error("target server must be a standby");
979  disconnect_database(conn, true);
980  }
981 
982  /*------------------------------------------------------------------------
983  * Logical replication requires a few parameters to be set on subscriber.
984  * Since these parameters are not a requirement for physical replication,
985  * we should check it to make sure it won't fail.
986  *
987  * - max_replication_slots >= number of dbs to be converted
988  * - max_logical_replication_workers >= number of dbs to be converted
989  * - max_worker_processes >= 1 + number of dbs to be converted
990  *------------------------------------------------------------------------
991  */
992  res = PQexec(conn,
993  "SELECT setting FROM pg_catalog.pg_settings WHERE name IN ("
994  "'max_logical_replication_workers', "
995  "'max_replication_slots', "
996  "'max_worker_processes', "
997  "'primary_slot_name') "
998  "ORDER BY name");
999 
1001  {
1002  pg_log_error("could not obtain subscriber settings: %s",
1004  disconnect_database(conn, true);
1005  }
1006 
1007  max_lrworkers = atoi(PQgetvalue(res, 0, 0));
1008  max_repslots = atoi(PQgetvalue(res, 1, 0));
1009  max_wprocs = atoi(PQgetvalue(res, 2, 0));
1010  if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
1012 
1013  pg_log_debug("subscriber: max_logical_replication_workers: %d",
1014  max_lrworkers);
1015  pg_log_debug("subscriber: max_replication_slots: %d", max_repslots);
1016  pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
1017  if (primary_slot_name)
1018  pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
1019 
1020  PQclear(res);
1021 
1022  disconnect_database(conn, false);
1023 
1024  if (max_repslots < num_dbs)
1025  {
1026  pg_log_error("subscriber requires %d replication slots, but only %d remain",
1027  num_dbs, max_repslots);
1028  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1029  "max_replication_slots", num_dbs);
1030  failed = true;
1031  }
1032 
1033  if (max_lrworkers < num_dbs)
1034  {
1035  pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
1036  num_dbs, max_lrworkers);
1037  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1038  "max_logical_replication_workers", num_dbs);
1039  failed = true;
1040  }
1041 
1042  if (max_wprocs < num_dbs + 1)
1043  {
1044  pg_log_error("subscriber requires %d worker processes, but only %d remain",
1045  num_dbs + 1, max_wprocs);
1046  pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
1047  "max_worker_processes", num_dbs + 1);
1048  failed = true;
1049  }
1050 
1051  if (failed)
1052  exit(1);
1053 }
static char * primary_slot_name

References conn, connect_database(), dbinfo, disconnect_database(), exit(), num_dbs, pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), primary_slot_name, res, and server_is_in_recovery().

Referenced by main().

◆ cleanup_objects_atexit()

static void cleanup_objects_atexit ( void  )
static

Definition at line 157 of file pg_createsubscriber.c.

158 {
159  if (success)
160  return;
161 
162  /*
163  * If the server is promoted, there is no way to use the current setup
164  * again. Warn the user that a new replication setup should be done before
165  * trying again.
166  */
167  if (recovery_ended)
168  {
169  pg_log_warning("failed after the end of recovery");
170  pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
171  "You must recreate the physical replica before continuing.");
172  }
173 
174  for (int i = 0; i < num_dbs; i++)
175  {
176  if (dbinfo[i].made_publication || dbinfo[i].made_replslot)
177  {
178  PGconn *conn;
179 
180  conn = connect_database(dbinfo[i].pubconninfo, false);
181  if (conn != NULL)
182  {
183  if (dbinfo[i].made_publication)
185  if (dbinfo[i].made_replslot)
186  drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname);
187  disconnect_database(conn, false);
188  }
189  else
190  {
191  /*
192  * If a connection could not be established, inform the user
193  * that some objects were left on primary and should be
194  * removed before trying again.
195  */
196  if (dbinfo[i].made_publication)
197  {
198  pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
199  dbinfo[i].pubname, dbinfo[i].dbname);
200  pg_log_warning_hint("Drop this publication before trying again.");
201  }
202  if (dbinfo[i].made_replslot)
203  {
204  pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
205  dbinfo[i].replslotname, dbinfo[i].dbname);
206  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
207  }
208  }
209  }
210  }
211 
212  if (standby_running)
214 }
#define pg_log_warning_hint(...)
Definition: logging.h:121
static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void stop_standby_server(const char *datadir)
static char * subscriber_dir
static bool success
static bool recovery_ended
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *slot_name)
static bool standby_running

References conn, connect_database(), dbinfo, dbname, disconnect_database(), drop_publication(), drop_replication_slot(), i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, num_dbs, pg_log_warning, pg_log_warning_hint, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, recovery_ended, LogicalRepInfo::replslotname, standby_running, stop_standby_server(), subscriber_dir, and success.

Referenced by main().

◆ concat_conninfo_dbname()

static char * concat_conninfo_dbname ( const char *  conninfo,
const char *  dbname 
)
static

Definition at line 409 of file pg_createsubscriber.c.

410 {
412  char *ret;
413 
414  Assert(conninfo != NULL);
415 
416  appendPQExpBufferStr(buf, conninfo);
417  appendConnStrItem(buf, "dbname", dbname);
418 
419  ret = pg_strdup(buf->data);
421 
422  return ret;
423 }
static void appendConnStrItem(PQExpBuffer buf, const char *keyword, const char *val)

References appendConnStrItem(), appendPQExpBufferStr(), Assert, buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), and pg_strdup().

Referenced by store_pub_sub_info().

◆ connect_database()

static PGconn * connect_database ( const char *  conninfo,
bool  exit_on_error 
)
static

Definition at line 505 of file pg_createsubscriber.c.

506 {
507  PGconn *conn;
508  PGresult *res;
509 
510  conn = PQconnectdb(conninfo);
511  if (PQstatus(conn) != CONNECTION_OK)
512  {
513  pg_log_error("connection to database failed: %s",
515  PQfinish(conn);
516 
517  if (exit_on_error)
518  exit(1);
519  return NULL;
520  }
521 
522  /* Secure search_path */
525  {
526  pg_log_error("could not clear \"search_path\": %s",
528  PQclear(res);
529  PQfinish(conn);
530 
531  if (exit_on_error)
532  exit(1);
533  return NULL;
534  }
535  PQclear(res);
536 
537  return conn;
538 }
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7212
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7149
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4893
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:745
@ CONNECTION_OK
Definition: libpq-fe.h:81

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, CONNECTION_OK, exit(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQconnectdb(), PQerrorMessage(), PQexec(), PQfinish(), PQresultErrorMessage(), PQresultStatus(), PQstatus(), and res.

Referenced by check_publisher(), check_subscriber(), cleanup_objects_atexit(), drop_failover_replication_slots(), drop_primary_replication_slot(), get_primary_sysid(), setup_publisher(), setup_recovery(), setup_subscriber(), and wait_for_end_recovery().

◆ create_logical_replication_slot()

static char * create_logical_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1314 of file pg_createsubscriber.c.

1315 {
1317  PGresult *res = NULL;
1318  const char *slot_name = dbinfo->replslotname;
1319  char *slot_name_esc;
1320  char *lsn = NULL;
1321 
1322  Assert(conn != NULL);
1323 
1324  pg_log_info("creating the replication slot \"%s\" in database \"%s\"",
1325  slot_name, dbinfo->dbname);
1326 
1327  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1328 
1330  "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)",
1331  slot_name_esc);
1332 
1333  pg_free(slot_name_esc);
1334 
1335  pg_log_debug("command is: %s", str->data);
1336 
1337  if (!dry_run)
1338  {
1339  res = PQexec(conn, str->data);
1341  {
1342  pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
1343  slot_name, dbinfo->dbname,
1345  PQclear(res);
1347  return NULL;
1348  }
1349 
1350  lsn = pg_strdup(PQgetvalue(res, 0, 0));
1351  PQclear(res);
1352  }
1353 
1354  /* For cleanup purposes */
1355  dbinfo->made_replslot = true;
1356 
1358 
1359  return lsn;
1360 }
const char * str
static bool dry_run

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_free(), pg_log_debug, pg_log_error, pg_log_info, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::replslotname, res, and str.

Referenced by setup_publisher().

◆ create_publication()

static void create_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1553 of file pg_createsubscriber.c.

1554 {
1556  PGresult *res;
1557  char *ipubname_esc;
1558  char *spubname_esc;
1559 
1560  Assert(conn != NULL);
1561 
1562  ipubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1563  spubname_esc = PQescapeLiteral(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1564 
1565  /* Check if the publication already exists */
1567  "SELECT 1 FROM pg_catalog.pg_publication "
1568  "WHERE pubname = %s",
1569  spubname_esc);
1570  res = PQexec(conn, str->data);
1572  {
1573  pg_log_error("could not obtain publication information: %s",
1575  disconnect_database(conn, true);
1576  }
1577 
1578  if (PQntuples(res) == 1)
1579  {
1580  /*
1581  * Unfortunately, if it reaches this code path, it will always fail
1582  * (unless you decide to change the existing publication name). That's
1583  * bad but it is very unlikely that the user will choose a name with
1584  * pg_createsubscriber_ prefix followed by the exact database oid and
1585  * a random number.
1586  */
1587  pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
1588  pg_log_error_hint("Consider renaming this publication before continuing.");
1589  disconnect_database(conn, true);
1590  }
1591 
1592  PQclear(res);
1594 
1595  pg_log_info("creating publication \"%s\" in database \"%s\"",
1597 
1598  appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
1599  ipubname_esc);
1600 
1601  pg_log_debug("command is: %s", str->data);
1602 
1603  if (!dry_run)
1604  {
1605  res = PQexec(conn, str->data);
1607  {
1608  pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
1610  disconnect_database(conn, true);
1611  }
1612  PQclear(res);
1613  }
1614 
1615  /* For cleanup purposes */
1616  dbinfo->made_publication = true;
1617 
1618  pg_free(ipubname_esc);
1619  pg_free(spubname_esc);
1621 }
char * PQescapeIdentifier(PGconn *conn, const char *str, size_t len)
Definition: fe-exec.c:4310
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, LogicalRepInfo::made_publication, pg_free(), pg_log_debug, pg_log_error, pg_log_error_hint, pg_log_info, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubname, res, resetPQExpBuffer(), and str.

Referenced by setup_publisher().

◆ create_subscription()

static void create_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1681 of file pg_createsubscriber.c.

1682 {
1684  PGresult *res;
1685  char *pubname_esc;
1686  char *subname_esc;
1687  char *pubconninfo_esc;
1688  char *replslotname_esc;
1689 
1690  Assert(conn != NULL);
1691 
1692  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1693  subname_esc = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
1694  pubconninfo_esc = PQescapeLiteral(conn, dbinfo->pubconninfo, strlen(dbinfo->pubconninfo));
1695  replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
1696 
1697  pg_log_info("creating subscription \"%s\" in database \"%s\"",
1699 
1701  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
1702  "WITH (create_slot = false, enabled = false, "
1703  "slot_name = %s, copy_data = false)",
1704  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
1705 
1706  pg_free(pubname_esc);
1707  pg_free(subname_esc);
1708  pg_free(pubconninfo_esc);
1709  pg_free(replslotname_esc);
1710 
1711  pg_log_debug("command is: %s", str->data);
1712 
1713  if (!dry_run)
1714  {
1715  res = PQexec(conn, str->data);
1717  {
1718  pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
1720  disconnect_database(conn, true);
1721  }
1722  PQclear(res);
1723  }
1724 
1726 }

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQescapeLiteral(), PQexec(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, res, str, and LogicalRepInfo::subname.

Referenced by setup_subscriber().

◆ disconnect_database()

◆ drop_existing_subscriptions()

static void drop_existing_subscriptions ( PGconn conn,
const char *  subname,
const char *  dbname 
)
static

Definition at line 1062 of file pg_createsubscriber.c.

1063 {
1064  PQExpBuffer query = createPQExpBuffer();
1065  PGresult *res;
1066 
1067  Assert(conn != NULL);
1068 
1069  /*
1070  * Construct a query string. These commands are allowed to be executed
1071  * within a transaction.
1072  */
1073  appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
1074  subname);
1075  appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
1076  subname);
1077  appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
1078 
1079  pg_log_info("dropping subscription \"%s\" in database \"%s\"",
1080  subname, dbname);
1081 
1082  if (!dry_run)
1083  {
1084  res = PQexec(conn, query->data);
1085 
1087  {
1088  pg_log_error("could not drop subscription \"%s\": %s",
1090  disconnect_database(conn, true);
1091  }
1092 
1093  PQclear(res);
1094  }
1095 
1096  destroyPQExpBuffer(query);
1097 }
NameData subname

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), PQExpBufferData::data, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultErrorMessage(), PQresultStatus(), res, and subname.

Referenced by check_and_drop_existing_subscriptions().

◆ drop_failover_replication_slots()

static void drop_failover_replication_slots ( struct LogicalRepInfo dbinfo)
static

Definition at line 1272 of file pg_createsubscriber.c.

1273 {
1274  PGconn *conn;
1275  PGresult *res;
1276 
1277  conn = connect_database(dbinfo[0].subconninfo, false);
1278  if (conn != NULL)
1279  {
1280  /* Get failover replication slot names */
1281  res = PQexec(conn,
1282  "SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover");
1283 
1285  {
1286  /* Remove failover replication slots from subscriber */
1287  for (int i = 0; i < PQntuples(res); i++)
1289  }
1290  else
1291  {
1292  pg_log_warning("could not obtain failover replication slot information: %s",
1294  pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1295  }
1296 
1297  PQclear(res);
1298  disconnect_database(conn, false);
1299  }
1300  else
1301  {
1302  pg_log_warning("could not drop failover replication slot");
1303  pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
1304  }
1305 }

References conn, connect_database(), dbinfo, disconnect_database(), drop_replication_slot(), i, pg_log_warning, pg_log_warning_hint, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), and res.

Referenced by main().

◆ drop_primary_replication_slot()

static void drop_primary_replication_slot ( struct LogicalRepInfo dbinfo,
const char *  slotname 
)
static

Definition at line 1242 of file pg_createsubscriber.c.

1243 {
1244  PGconn *conn;
1245 
1246  /* Replication slot does not exist, do nothing */
1247  if (!primary_slot_name)
1248  return;
1249 
1250  conn = connect_database(dbinfo[0].pubconninfo, false);
1251  if (conn != NULL)
1252  {
1253  drop_replication_slot(conn, &dbinfo[0], slotname);
1254  disconnect_database(conn, false);
1255  }
1256  else
1257  {
1258  pg_log_warning("could not drop replication slot \"%s\" on primary",
1259  slotname);
1260  pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
1261  }
1262 }

References conn, connect_database(), dbinfo, disconnect_database(), drop_replication_slot(), pg_log_warning, pg_log_warning_hint, and primary_slot_name.

Referenced by main().

◆ drop_publication()

static void drop_publication ( PGconn conn,
struct LogicalRepInfo dbinfo 
)
static

Definition at line 1627 of file pg_createsubscriber.c.

1628 {
1630  PGresult *res;
1631  char *pubname_esc;
1632 
1633  Assert(conn != NULL);
1634 
1635  pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
1636 
1637  pg_log_info("dropping publication \"%s\" in database \"%s\"",
1639 
1640  appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
1641 
1642  pg_free(pubname_esc);
1643 
1644  pg_log_debug("command is: %s", str->data);
1645 
1646  if (!dry_run)
1647  {
1648  res = PQexec(conn, str->data);
1650  {
1651  pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
1653  dbinfo->made_publication = false; /* don't try again. */
1654 
1655  /*
1656  * Don't disconnect and exit here. This routine is used by primary
1657  * (cleanup publication / replication slot due to an error) and
1658  * subscriber (remove the replicated publications). In both cases,
1659  * it can continue and provide instructions for the user to remove
1660  * it later if cleanup fails.
1661  */
1662  }
1663  PQclear(res);
1664  }
1665 
1667 }

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_publication, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQresultErrorMessage(), PQresultStatus(), LogicalRepInfo::pubname, res, and str.

Referenced by cleanup_objects_atexit(), and setup_subscriber().

◆ drop_replication_slot()

static void drop_replication_slot ( PGconn conn,
struct LogicalRepInfo dbinfo,
const char *  slot_name 
)
static

Definition at line 1363 of file pg_createsubscriber.c.

1365 {
1367  char *slot_name_esc;
1368  PGresult *res;
1369 
1370  Assert(conn != NULL);
1371 
1372  pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
1373  slot_name, dbinfo->dbname);
1374 
1375  slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
1376 
1377  appendPQExpBuffer(str, "SELECT pg_catalog.pg_drop_replication_slot(%s)", slot_name_esc);
1378 
1379  pg_free(slot_name_esc);
1380 
1381  pg_log_debug("command is: %s", str->data);
1382 
1383  if (!dry_run)
1384  {
1385  res = PQexec(conn, str->data);
1387  {
1388  pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
1389  slot_name, dbinfo->dbname, PQresultErrorMessage(res));
1390  dbinfo->made_replslot = false; /* don't try again. */
1391  }
1392 
1393  PQclear(res);
1394  }
1395 
1397 }

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), dry_run, LogicalRepInfo::made_replslot, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQresultErrorMessage(), PQresultStatus(), res, and str.

Referenced by cleanup_objects_atexit(), drop_failover_replication_slots(), and drop_primary_replication_slot().

◆ enable_subscription()

static void enable_subscription ( PGconn conn,
const struct LogicalRepInfo dbinfo 
)
static

Definition at line 1830 of file pg_createsubscriber.c.

1831 {
1833  PGresult *res;
1834  char *subname;
1835 
1836  Assert(conn != NULL);
1837 
1839 
1840  pg_log_info("enabling subscription \"%s\" in database \"%s\"",
1842 
1843  appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
1844 
1845  pg_log_debug("command is: %s", str->data);
1846 
1847  if (!dry_run)
1848  {
1849  res = PQexec(conn, str->data);
1851  {
1852  pg_log_error("could not enable subscription \"%s\": %s",
1854  disconnect_database(conn, true);
1855  }
1856 
1857  PQclear(res);
1858  }
1859 
1860  pg_free(subname);
1862 }

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_COMMAND_OK, PQclear(), PQescapeIdentifier(), PQexec(), PQresultErrorMessage(), PQresultStatus(), res, str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ generate_object_name()

static char * generate_object_name ( PGconn conn)
static

Definition at line 685 of file pg_createsubscriber.c.

686 {
687  PGresult *res;
688  Oid oid;
689  uint32 rand;
690  char *objname;
691 
692  res = PQexec(conn,
693  "SELECT oid FROM pg_catalog.pg_database "
694  "WHERE datname = pg_catalog.current_database()");
696  {
697  pg_log_error("could not obtain database OID: %s",
699  disconnect_database(conn, true);
700  }
701 
702  if (PQntuples(res) != 1)
703  {
704  pg_log_error("could not obtain database OID: got %d rows, expected %d row",
705  PQntuples(res), 1);
706  disconnect_database(conn, true);
707  }
708 
709  /* Database OID */
710  oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
711 
712  PQclear(res);
713 
714  /* Random unsigned integer */
715  rand = pg_prng_uint32(&prng_state);
716 
717  /*
718  * Build the object name. The name must not exceed NAMEDATALEN - 1. This
719  * current schema uses a maximum of 40 characters (20 + 10 + 1 + 8 +
720  * '\0').
721  */
722  objname = psprintf("pg_createsubscriber_%u_%x", oid, rand);
723 
724  return objname;
725 }
unsigned int uint32
Definition: c.h:509
static pg_prng_state prng_state
uint32 pg_prng_uint32(pg_prng_state *state)
Definition: pg_prng.c:227
unsigned int Oid
Definition: postgres_ext.h:31
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46

References conn, disconnect_database(), pg_log_error, pg_prng_uint32(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), prng_state, psprintf(), and res.

Referenced by setup_publisher().

◆ get_base_conninfo()

static char * get_base_conninfo ( const char *  conninfo,
char **  dbname 
)
static

Definition at line 271 of file pg_createsubscriber.c.

272 {
274  PQconninfoOption *conn_opts;
275  PQconninfoOption *conn_opt;
276  char *errmsg = NULL;
277  char *ret;
278 
279  conn_opts = PQconninfoParse(conninfo, &errmsg);
280  if (conn_opts == NULL)
281  {
282  pg_log_error("could not parse connection string: %s", errmsg);
283  PQfreemem(errmsg);
284  return NULL;
285  }
286 
288  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
289  {
290  if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
291  {
292  if (strcmp(conn_opt->keyword, "dbname") == 0)
293  {
294  if (dbname)
295  *dbname = pg_strdup(conn_opt->val);
296  continue;
297  }
298  appendConnStrItem(buf, conn_opt->keyword, conn_opt->val);
299  }
300  }
301 
302  ret = pg_strdup(buf->data);
303 
305  PQconninfoFree(conn_opts);
306 
307  return ret;
308 }
int errmsg(const char *fmt,...)
Definition: elog.c:1070
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:5753
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:7035
void PQfreemem(void *ptr)
Definition: fe-exec.c:4032

References appendConnStrItem(), buf, createPQExpBuffer(), dbname, destroyPQExpBuffer(), errmsg(), _PQconninfoOption::keyword, pg_log_error, pg_strdup(), PQconninfoFree(), PQconninfoParse(), PQfreemem(), and _PQconninfoOption::val.

Referenced by main().

◆ get_exec_path()

static char * get_exec_path ( const char *  argv0,
const char *  progname 
)
static

Definition at line 341 of file pg_createsubscriber.c.

342 {
343  char *versionstr;
344  char *exec_path;
345  int ret;
346 
347  versionstr = psprintf("%s (PostgreSQL) %s\n", progname, PG_VERSION);
349  ret = find_other_exec(argv0, progname, versionstr, exec_path);
350 
351  if (ret < 0)
352  {
353  char full_path[MAXPGPATH];
354 
355  if (find_my_exec(argv0, full_path) < 0)
356  strlcpy(full_path, progname, sizeof(full_path));
357 
358  if (ret == -1)
359  pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
360  progname, "pg_createsubscriber", full_path);
361  else
362  pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
363  progname, full_path, "pg_createsubscriber");
364  }
365 
366  pg_log_debug("%s path is: %s", progname, exec_path);
367 
368  return exec_path;
369 }
int find_my_exec(const char *argv0, char *retpath)
Definition: exec.c:160
int find_other_exec(const char *argv0, const char *target, const char *versionstr, char *retpath)
Definition: exec.c:310
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static const char * progname
static char * argv0
Definition: pg_ctl.c:93
static char * exec_path
Definition: pg_ctl.c:88
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45

References argv0, exec_path, find_my_exec(), find_other_exec(), MAXPGPATH, pg_fatal, pg_log_debug, pg_malloc(), progname, psprintf(), and strlcpy().

Referenced by main().

◆ get_primary_sysid()

static uint64 get_primary_sysid ( const char *  conninfo)
static

Definition at line 560 of file pg_createsubscriber.c.

561 {
562  PGconn *conn;
563  PGresult *res;
564  uint64 sysid;
565 
566  pg_log_info("getting system identifier from publisher");
567 
568  conn = connect_database(conninfo, true);
569 
570  res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
572  {
573  pg_log_error("could not get system identifier: %s",
575  disconnect_database(conn, true);
576  }
577  if (PQntuples(res) != 1)
578  {
579  pg_log_error("could not get system identifier: got %d rows, expected %d row",
580  PQntuples(res), 1);
581  disconnect_database(conn, true);
582  }
583 
584  sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
585 
586  pg_log_info("system identifier is %llu on publisher",
587  (unsigned long long) sysid);
588 
589  PQclear(res);
590  disconnect_database(conn, false);
591 
592  return sysid;
593 }
#define strtou64(str, endptr, base)
Definition: c.h:1301

References conn, connect_database(), disconnect_database(), pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), res, and strtou64.

Referenced by main().

◆ get_standby_sysid()

static uint64 get_standby_sysid ( const char *  datadir)
static

Definition at line 601 of file pg_createsubscriber.c.

602 {
603  ControlFileData *cf;
604  bool crc_ok;
605  uint64 sysid;
606 
607  pg_log_info("getting system identifier from subscriber");
608 
609  cf = get_controlfile(datadir, &crc_ok);
610  if (!crc_ok)
611  pg_fatal("control file appears to be corrupt");
612 
613  sysid = cf->system_identifier;
614 
615  pg_log_info("system identifier is %llu on subscriber",
616  (unsigned long long) sysid);
617 
618  pg_free(cf);
619 
620  return sysid;
621 }
ControlFileData * get_controlfile(const char *DataDir, bool *crc_ok_p)
uint64 system_identifier
Definition: pg_control.h:110

References datadir, get_controlfile(), pg_fatal, pg_free(), pg_log_info, and ControlFileData::system_identifier.

Referenced by main().

◆ get_sub_conninfo()

static char * get_sub_conninfo ( const struct CreateSubscriberOptions opt)
static

Definition at line 315 of file pg_createsubscriber.c.

316 {
318  char *ret;
319 
320  appendConnStrItem(buf, "port", opt->sub_port);
321 #if !defined(WIN32)
322  appendConnStrItem(buf, "host", opt->socket_dir);
323 #endif
324  if (opt->sub_username != NULL)
325  appendConnStrItem(buf, "user", opt->sub_username);
326  appendConnStrItem(buf, "fallback_application_name", progname);
327 
328  ret = pg_strdup(buf->data);
329 
331 
332  return ret;
333 }

References appendConnStrItem(), buf, createPQExpBuffer(), destroyPQExpBuffer(), pg_strdup(), progname, CreateSubscriberOptions::socket_dir, CreateSubscriberOptions::sub_port, and CreateSubscriberOptions::sub_username.

Referenced by main().

◆ main()

int main ( int  argc,
char **  argv 
)

Definition at line 1865 of file pg_createsubscriber.c.

1866 {
1867  static struct option long_options[] =
1868  {
1869  {"database", required_argument, NULL, 'd'},
1870  {"pgdata", required_argument, NULL, 'D'},
1871  {"dry-run", no_argument, NULL, 'n'},
1872  {"subscriber-port", required_argument, NULL, 'p'},
1873  {"publisher-server", required_argument, NULL, 'P'},
1874  {"socketdir", required_argument, NULL, 's'},
1875  {"recovery-timeout", required_argument, NULL, 't'},
1876  {"subscriber-username", required_argument, NULL, 'U'},
1877  {"verbose", no_argument, NULL, 'v'},
1878  {"version", no_argument, NULL, 'V'},
1879  {"help", no_argument, NULL, '?'},
1880  {"config-file", required_argument, NULL, 1},
1881  {"publication", required_argument, NULL, 2},
1882  {"replication-slot", required_argument, NULL, 3},
1883  {"subscription", required_argument, NULL, 4},
1884  {NULL, 0, NULL, 0}
1885  };
1886 
1887  struct CreateSubscriberOptions opt = {0};
1888 
1889  int c;
1890  int option_index;
1891 
1892  char *pub_base_conninfo;
1893  char *sub_base_conninfo;
1894  char *dbname_conninfo = NULL;
1895 
1896  uint64 pub_sysid;
1897  uint64 sub_sysid;
1898  struct stat statbuf;
1899 
1900  char *consistent_lsn;
1901 
1902  char pidfile[MAXPGPATH];
1903 
1904  pg_logging_init(argv[0]);
1906  progname = get_progname(argv[0]);
1907  set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
1908 
1909  if (argc > 1)
1910  {
1911  if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
1912  {
1913  usage();
1914  exit(0);
1915  }
1916  else if (strcmp(argv[1], "-V") == 0
1917  || strcmp(argv[1], "--version") == 0)
1918  {
1919  puts("pg_createsubscriber (PostgreSQL) " PG_VERSION);
1920  exit(0);
1921  }
1922  }
1923 
1924  /* Default settings */
1925  subscriber_dir = NULL;
1926  opt.config_file = NULL;
1927  opt.pub_conninfo_str = NULL;
1928  opt.socket_dir = NULL;
1929  opt.sub_port = DEFAULT_SUB_PORT;
1930  opt.sub_username = NULL;
1932  {
1933  0
1934  };
1935  opt.recovery_timeout = 0;
1936 
1937  /*
1938  * Don't allow it to be run as root. It uses pg_ctl which does not allow
1939  * it either.
1940  */
1941 #ifndef WIN32
1942  if (geteuid() == 0)
1943  {
1944  pg_log_error("cannot be executed by \"root\"");
1945  pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
1946  progname);
1947  exit(1);
1948  }
1949 #endif
1950 
1952 
1953  while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
1954  long_options, &option_index)) != -1)
1955  {
1956  switch (c)
1957  {
1958  case 'd':
1960  {
1962  num_dbs++;
1963  }
1964  else
1965  {
1966  pg_log_error("database \"%s\" specified more than once", optarg);
1967  exit(1);
1968  }
1969  break;
1970  case 'D':
1973  break;
1974  case 'n':
1975  dry_run = true;
1976  break;
1977  case 'p':
1978  opt.sub_port = pg_strdup(optarg);
1979  break;
1980  case 'P':
1982  break;
1983  case 's':
1984  opt.socket_dir = pg_strdup(optarg);
1986  break;
1987  case 't':
1988  opt.recovery_timeout = atoi(optarg);
1989  break;
1990  case 'U':
1991  opt.sub_username = pg_strdup(optarg);
1992  break;
1993  case 'v':
1995  break;
1996  case 1:
1997  opt.config_file = pg_strdup(optarg);
1998  break;
1999  case 2:
2001  {
2003  num_pubs++;
2004  }
2005  else
2006  {
2007  pg_log_error("publication \"%s\" specified more than once", optarg);
2008  exit(1);
2009  }
2010  break;
2011  case 3:
2013  {
2015  num_replslots++;
2016  }
2017  else
2018  {
2019  pg_log_error("replication slot \"%s\" specified more than once", optarg);
2020  exit(1);
2021  }
2022  break;
2023  case 4:
2025  {
2027  num_subs++;
2028  }
2029  else
2030  {
2031  pg_log_error("subscription \"%s\" specified more than once", optarg);
2032  exit(1);
2033  }
2034  break;
2035  default:
2036  /* getopt_long already emitted a complaint */
2037  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2038  exit(1);
2039  }
2040  }
2041 
2042  /* Any non-option arguments? */
2043  if (optind < argc)
2044  {
2045  pg_log_error("too many command-line arguments (first is \"%s\")",
2046  argv[optind]);
2047  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2048  exit(1);
2049  }
2050 
2051  /* Required arguments */
2052  if (subscriber_dir == NULL)
2053  {
2054  pg_log_error("no subscriber data directory specified");
2055  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2056  exit(1);
2057  }
2058 
2059  /* If socket directory is not provided, use the current directory */
2060  if (opt.socket_dir == NULL)
2061  {
2062  char cwd[MAXPGPATH];
2063 
2064  if (!getcwd(cwd, MAXPGPATH))
2065  pg_fatal("could not determine current directory");
2066  opt.socket_dir = pg_strdup(cwd);
2068  }
2069 
2070  /*
2071  * Parse connection string. Build a base connection string that might be
2072  * reused by multiple databases.
2073  */
2074  if (opt.pub_conninfo_str == NULL)
2075  {
2076  /*
2077  * TODO use primary_conninfo (if available) from subscriber and
2078  * extract publisher connection string. Assume that there are
2079  * identical entries for physical and logical replication. If there is
2080  * not, we would fail anyway.
2081  */
2082  pg_log_error("no publisher connection string specified");
2083  pg_log_error_hint("Try \"%s --help\" for more information.", progname);
2084  exit(1);
2085  }
2086  pg_log_info("validating publisher connection string");
2087  pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
2088  &dbname_conninfo);
2089  if (pub_base_conninfo == NULL)
2090  exit(1);
2091 
2092  pg_log_info("validating subscriber connection string");
2093  sub_base_conninfo = get_sub_conninfo(&opt);
2094 
2095  if (opt.database_names.head == NULL)
2096  {
2097  pg_log_info("no database was specified");
2098 
2099  /*
2100  * If --database option is not provided, try to obtain the dbname from
2101  * the publisher conninfo. If dbname parameter is not available, error
2102  * out.
2103  */
2104  if (dbname_conninfo)
2105  {
2106  simple_string_list_append(&opt.database_names, dbname_conninfo);
2107  num_dbs++;
2108 
2109  pg_log_info("database name \"%s\" was extracted from the publisher connection string",
2110  dbname_conninfo);
2111  }
2112  else
2113  {
2114  pg_log_error("no database name specified");
2115  pg_log_error_hint("Try \"%s --help\" for more information.",
2116  progname);
2117  exit(1);
2118  }
2119  }
2120 
2121  /* Number of object names must match number of databases */
2122  if (num_pubs > 0 && num_pubs != num_dbs)
2123  {
2124  pg_log_error("wrong number of publication names specified");
2125  pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
2126  num_pubs, num_dbs);
2127  exit(1);
2128  }
2129  if (num_subs > 0 && num_subs != num_dbs)
2130  {
2131  pg_log_error("wrong number of subscription names specified");
2132  pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
2133  num_subs, num_dbs);
2134  exit(1);
2135  }
2136  if (num_replslots > 0 && num_replslots != num_dbs)
2137  {
2138  pg_log_error("wrong number of replication slot names specified");
2139  pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
2141  exit(1);
2142  }
2143 
2144  /* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
2145  pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
2146  pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
2147 
2148  /* Rudimentary check for a data directory */
2150 
2151  /*
2152  * Store database information for publisher and subscriber. It should be
2153  * called before atexit() because its return is used in the
2154  * cleanup_objects_atexit().
2155  */
2156  dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo);
2157 
2158  /* Register a function to clean up objects in case of failure */
2159  atexit(cleanup_objects_atexit);
2160 
2161  /*
2162  * Check if the subscriber data directory has the same system identifier
2163  * than the publisher data directory.
2164  */
2165  pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo);
2166  sub_sysid = get_standby_sysid(subscriber_dir);
2167  if (pub_sysid != sub_sysid)
2168  pg_fatal("subscriber data directory is not a copy of the source database cluster");
2169 
2170  /* Subscriber PID file */
2171  snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
2172 
2173  /*
2174  * The standby server must not be running. If the server is started under
2175  * service manager and pg_createsubscriber stops it, the service manager
2176  * might react to this action and start the server again. Therefore,
2177  * refuse to proceed if the server is running to avoid possible failures.
2178  */
2179  if (stat(pidfile, &statbuf) == 0)
2180  {
2181  pg_log_error("standby server is running");
2182  pg_log_error_hint("Stop the standby server and try again.");
2183  exit(1);
2184  }
2185 
2186  /*
2187  * Start a short-lived standby server with temporary parameters (provided
2188  * by command-line options). The goal is to avoid connections during the
2189  * transformation steps.
2190  */
2191  pg_log_info("starting the standby server with command-line options");
2192  start_standby_server(&opt, true, false);
2193 
2194  /* Check if the standby server is ready for logical replication */
2196 
2197  /* Check if the primary server is ready for logical replication */
2199 
2200  /*
2201  * Stop the target server. The recovery process requires that the server
2202  * reaches a consistent state before targeting the recovery stop point.
2203  * Make sure a consistent state is reached (stop the target server
2204  * guarantees it) *before* creating the replication slots in
2205  * setup_publisher().
2206  */
2207  pg_log_info("stopping the subscriber");
2209 
2210  /* Create the required objects for each database on publisher */
2211  consistent_lsn = setup_publisher(dbinfo);
2212 
2213  /* Write the required recovery parameters */
2214  setup_recovery(dbinfo, subscriber_dir, consistent_lsn);
2215 
2216  /*
2217  * Start subscriber so the recovery parameters will take effect. Wait
2218  * until accepting connections. We don't want to start logical replication
2219  * during setup.
2220  */
2221  pg_log_info("starting the subscriber");
2222  start_standby_server(&opt, true, true);
2223 
2224  /* Waiting the subscriber to be promoted */
2225  wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
2226 
2227  /*
2228  * Create the subscription for each database on subscriber. It does not
2229  * enable it immediately because it needs to adjust the replication start
2230  * point to the LSN reported by setup_publisher(). It also cleans up
2231  * publications created by this tool and replication to the standby.
2232  */
2233  setup_subscriber(dbinfo, consistent_lsn);
2234 
2235  /* Remove primary_slot_name if it exists on primary */
2237 
2238  /* Remove failover replication slots if they exist on subscriber */
2240 
2241  /* Stop the subscriber */
2242  pg_log_info("stopping the subscriber");
2244 
2245  /* Change system identifier from subscriber */
2247 
2248  success = true;
2249 
2250  pg_log_info("Done!");
2251 
2252  return 0;
2253 }
#define PG_TEXTDOMAIN(domain)
Definition: c.h:1217
void set_pglocale_pgservice(const char *argv0, const char *app)
Definition: exec.c:429
int getopt_long(int argc, char *const argv[], const char *optstring, const struct option *longopts, int *longindex)
Definition: getopt_long.c:60
#define no_argument
Definition: getopt_long.h:24
#define required_argument
Definition: getopt_long.h:25
void pg_logging_increase_verbosity(void)
Definition: logging.c:184
void pg_logging_init(const char *argv0)
Definition: logging.c:83
void pg_logging_set_level(enum pg_log_level new_level)
Definition: logging.c:175
@ PG_LOG_WARNING
Definition: logging.h:38
#define pg_log_error_detail(...)
Definition: logging.h:109
static struct LogicalRepInfo * store_pub_sub_info(const struct CreateSubscriberOptions *opt, const char *pub_base_conninfo, const char *sub_base_conninfo)
static void modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
static char * pg_ctl_path
static char * get_exec_path(const char *argv0, const char *progname)
static void check_publisher(const struct LogicalRepInfo *dbinfo)
static int num_subs
static void cleanup_objects_atexit(void)
#define DEFAULT_SUB_PORT
static int num_replslots
static void check_subscriber(const struct LogicalRepInfo *dbinfo)
static void check_data_directory(const char *datadir)
static void usage()
static char * setup_publisher(struct LogicalRepInfo *dbinfo)
static void setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
static char * get_base_conninfo(const char *conninfo, char **dbname)
static uint64 get_standby_sysid(const char *datadir)
static int num_pubs
static void start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access, bool restrict_logical_worker)
static char * get_sub_conninfo(const struct CreateSubscriberOptions *opt)
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotname)
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
static char * pg_resetwal_path
static uint64 get_primary_sysid(const char *conninfo)
static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const char *lsn)
PGDLLIMPORT int optind
Definition: getopt.c:50
PGDLLIMPORT char * optarg
Definition: getopt.c:52
void canonicalize_path(char *path)
Definition: path.c:265
const char * get_progname(const char *argv0)
Definition: path.c:575
char * c
void get_restricted_token(void)
bool simple_string_list_member(SimpleStringList *list, const char *val)
Definition: simple_list.c:87
void simple_string_list_append(SimpleStringList *list, const char *val)
Definition: simple_list.c:63
struct SimpleStringList SimpleStringList
SimpleStringList database_names
SimpleStringList replslot_names
SimpleStringListCell * head
Definition: simple_list.h:42

References canonicalize_path(), check_data_directory(), check_publisher(), check_subscriber(), cleanup_objects_atexit(), CreateSubscriberOptions::config_file, CreateSubscriberOptions::database_names, dbinfo, DEFAULT_SUB_PORT, drop_failover_replication_slots(), drop_primary_replication_slot(), dry_run, exit(), get_base_conninfo(), get_exec_path(), get_primary_sysid(), get_progname(), get_restricted_token(), get_standby_sysid(), get_sub_conninfo(), getopt_long(), SimpleStringList::head, MAXPGPATH, modify_subscriber_sysid(), no_argument, num_dbs, num_pubs, num_replslots, num_subs, optarg, optind, pg_ctl_path, pg_fatal, pg_log_error, pg_log_error_detail, pg_log_error_hint, pg_log_info, PG_LOG_WARNING, pg_logging_increase_verbosity(), pg_logging_init(), pg_logging_set_level(), pg_resetwal_path, pg_strdup(), PG_TEXTDOMAIN, primary_slot_name, progname, CreateSubscriberOptions::pub_conninfo_str, CreateSubscriberOptions::pub_names, CreateSubscriberOptions::recovery_timeout, CreateSubscriberOptions::replslot_names, required_argument, set_pglocale_pgservice(), setup_publisher(), setup_recovery(), setup_subscriber(), simple_string_list_append(), simple_string_list_member(), snprintf, CreateSubscriberOptions::socket_dir, start_standby_server(), stat, stop_standby_server(), store_pub_sub_info(), CreateSubscriberOptions::sub_names, CreateSubscriberOptions::sub_port, CreateSubscriberOptions::sub_username, subscriber_dir, success, usage(), and wait_for_end_recovery().

◆ modify_subscriber_sysid()

static void modify_subscriber_sysid ( const struct CreateSubscriberOptions opt)
static

Definition at line 629 of file pg_createsubscriber.c.

630 {
631  ControlFileData *cf;
632  bool crc_ok;
633  struct timeval tv;
634 
635  char *cmd_str;
636 
637  pg_log_info("modifying system identifier of subscriber");
638 
639  cf = get_controlfile(subscriber_dir, &crc_ok);
640  if (!crc_ok)
641  pg_fatal("control file appears to be corrupt");
642 
643  /*
644  * Select a new system identifier.
645  *
646  * XXX this code was extracted from BootStrapXLOG().
647  */
648  gettimeofday(&tv, NULL);
649  cf->system_identifier = ((uint64) tv.tv_sec) << 32;
650  cf->system_identifier |= ((uint64) tv.tv_usec) << 12;
651  cf->system_identifier |= getpid() & 0xFFF;
652 
653  if (!dry_run)
655 
656  pg_log_info("system identifier is %llu on subscriber",
657  (unsigned long long) cf->system_identifier);
658 
659  pg_log_info("running pg_resetwal on the subscriber");
660 
661  cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
663 
664  pg_log_debug("pg_resetwal command is: %s", cmd_str);
665 
666  if (!dry_run)
667  {
668  int rc = system(cmd_str);
669 
670  if (rc == 0)
671  pg_log_info("subscriber successfully changed the system identifier");
672  else
673  pg_fatal("could not change system identifier of subscriber: %s", wait_result_to_str(rc));
674  }
675 
676  pg_free(cf);
677 }
void update_controlfile(const char *DataDir, ControlFileData *ControlFile, bool do_sync)
#define DEVNULL
Definition: port.h:160
char * wait_result_to_str(int exitstatus)
Definition: wait_error.c:33
int gettimeofday(struct timeval *tp, void *tzp)

References DEVNULL, dry_run, get_controlfile(), gettimeofday(), pg_fatal, pg_free(), pg_log_debug, pg_log_info, pg_resetwal_path, psprintf(), subscriber_dir, ControlFileData::system_identifier, update_controlfile(), and wait_result_to_str().

Referenced by main().

◆ pg_ctl_status()

static void pg_ctl_status ( const char *  pg_ctl_cmd,
int  rc 
)
static

Definition at line 1403 of file pg_createsubscriber.c.

1404 {
1405  if (rc != 0)
1406  {
1407  if (WIFEXITED(rc))
1408  {
1409  pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
1410  }
1411  else if (WIFSIGNALED(rc))
1412  {
1413 #if defined(WIN32)
1414  pg_log_error("pg_ctl was terminated by exception 0x%X",
1415  WTERMSIG(rc));
1416  pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
1417 #else
1418  pg_log_error("pg_ctl was terminated by signal %d: %s",
1419  WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
1420 #endif
1421  }
1422  else
1423  {
1424  pg_log_error("pg_ctl exited with unrecognized status %d", rc);
1425  }
1426 
1427  pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
1428  exit(1);
1429  }
1430 }
const char * pg_strsignal(int signum)
Definition: pgstrsignal.c:39
#define WIFEXITED(w)
Definition: win32_port.h:152
#define WIFSIGNALED(w)
Definition: win32_port.h:153
#define WTERMSIG(w)
Definition: win32_port.h:155
#define WEXITSTATUS(w)
Definition: win32_port.h:154

References exit(), pg_log_error, pg_log_error_detail, pg_strsignal(), WEXITSTATUS, WIFEXITED, WIFSIGNALED, and WTERMSIG.

Referenced by start_standby_server(), and stop_standby_server().

◆ server_is_in_recovery()

static bool server_is_in_recovery ( PGconn conn)
static

Definition at line 813 of file pg_createsubscriber.c.

814 {
815  PGresult *res;
816  int ret;
817 
818  res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()");
819 
821  {
822  pg_log_error("could not obtain recovery progress: %s",
824  disconnect_database(conn, true);
825  }
826 
827 
828  ret = strcmp("t", PQgetvalue(res, 0, 0));
829 
830  PQclear(res);
831 
832  return ret == 0;
833 }

References conn, disconnect_database(), pg_log_error, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetvalue(), PQresultErrorMessage(), PQresultStatus(), and res.

Referenced by check_publisher(), check_subscriber(), and wait_for_end_recovery().

◆ set_replication_progress()

static void set_replication_progress ( PGconn conn,
const struct LogicalRepInfo dbinfo,
const char *  lsn 
)
static

Definition at line 1739 of file pg_createsubscriber.c.

1740 {
1742  PGresult *res;
1743  Oid suboid;
1744  char *subname;
1745  char *dbname;
1746  char *originname;
1747  char *lsnstr;
1748 
1749  Assert(conn != NULL);
1750 
1753 
1755  "SELECT s.oid FROM pg_catalog.pg_subscription s "
1756  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
1757  "WHERE s.subname = %s AND d.datname = %s",
1758  subname, dbname);
1759 
1760  res = PQexec(conn, str->data);
1762  {
1763  pg_log_error("could not obtain subscription OID: %s",
1765  disconnect_database(conn, true);
1766  }
1767 
1768  if (PQntuples(res) != 1 && !dry_run)
1769  {
1770  pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
1771  PQntuples(res), 1);
1772  disconnect_database(conn, true);
1773  }
1774 
1775  if (dry_run)
1776  {
1777  suboid = InvalidOid;
1778  lsnstr = psprintf("%X/%X", LSN_FORMAT_ARGS((XLogRecPtr) InvalidXLogRecPtr));
1779  }
1780  else
1781  {
1782  suboid = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
1783  lsnstr = psprintf("%s", lsn);
1784  }
1785 
1786  PQclear(res);
1787 
1788  /*
1789  * The origin name is defined as pg_%u. %u is the subscription OID. See
1790  * ApplyWorkerMain().
1791  */
1792  originname = psprintf("pg_%u", suboid);
1793 
1794  pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
1795  originname, lsnstr, dbinfo->dbname);
1796 
1799  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
1800  originname, lsnstr);
1801 
1802  pg_log_debug("command is: %s", str->data);
1803 
1804  if (!dry_run)
1805  {
1806  res = PQexec(conn, str->data);
1808  {
1809  pg_log_error("could not set replication progress for subscription \"%s\": %s",
1811  disconnect_database(conn, true);
1812  }
1813  PQclear(res);
1814  }
1815 
1816  pg_free(subname);
1817  pg_free(dbname);
1818  pg_free(originname);
1819  pg_free(lsnstr);
1821 }
#define InvalidOid
Definition: postgres_ext.h:36
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28

References appendPQExpBuffer(), Assert, conn, createPQExpBuffer(), dbinfo, LogicalRepInfo::dbname, dbname, destroyPQExpBuffer(), disconnect_database(), dry_run, InvalidOid, InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_free(), pg_log_debug, pg_log_error, pg_log_info, PGRES_TUPLES_OK, PQclear(), PQescapeLiteral(), PQexec(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), psprintf(), res, resetPQExpBuffer(), str, LogicalRepInfo::subname, and subname.

Referenced by setup_subscriber().

◆ setup_publisher()

static char * setup_publisher ( struct LogicalRepInfo dbinfo)
static

Definition at line 734 of file pg_createsubscriber.c.

735 {
736  char *lsn = NULL;
737 
738  pg_prng_seed(&prng_state, (uint64) (getpid() ^ time(NULL)));
739 
740  for (int i = 0; i < num_dbs; i++)
741  {
742  PGconn *conn;
743  char *genname = NULL;
744 
745  conn = connect_database(dbinfo[i].pubconninfo, true);
746 
747  /*
748  * If an object name was not specified as command-line options, assign
749  * a generated object name. The replication slot has a different rule.
750  * The subscription name is assigned to the replication slot name if
751  * no replication slot is specified. It follows the same rule as
752  * CREATE SUBSCRIPTION.
753  */
754  if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
755  genname = generate_object_name(conn);
756  if (num_pubs == 0)
757  dbinfo[i].pubname = pg_strdup(genname);
758  if (num_subs == 0)
759  dbinfo[i].subname = pg_strdup(genname);
760  if (num_replslots == 0)
762 
763  /*
764  * Create publication on publisher. This step should be executed
765  * *before* promoting the subscriber to avoid any transactions between
766  * consistent LSN and the new publication rows (such transactions
767  * wouldn't see the new publication rows resulting in an error).
768  */
770 
771  /* Create replication slot on publisher */
772  if (lsn)
773  pg_free(lsn);
775  if (lsn != NULL || dry_run)
776  pg_log_info("create replication slot \"%s\" on publisher",
777  dbinfo[i].replslotname);
778  else
779  exit(1);
780 
781  /*
782  * Since we are using the LSN returned by the last replication slot as
783  * recovery_target_lsn, this LSN is ahead of the current WAL position
784  * and the recovery waits until the publisher writes a WAL record to
785  * reach the target and ends the recovery. On idle systems, this wait
786  * time is unpredictable and could lead to failure in promoting the
787  * subscriber. To avoid that, insert a harmless WAL record.
788  */
789  if (i == num_dbs - 1 && !dry_run)
790  {
791  PGresult *res;
792 
793  res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
795  {
796  pg_log_error("could not write an additional WAL record: %s",
798  disconnect_database(conn, true);
799  }
800  PQclear(res);
801  }
802 
803  disconnect_database(conn, false);
804  }
805 
806  return lsn;
807 }
static char * create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
static char * generate_object_name(PGconn *conn)
void pg_prng_seed(pg_prng_state *state, uint64 seed)
Definition: pg_prng.c:89

References conn, connect_database(), create_logical_replication_slot(), create_publication(), dbinfo, disconnect_database(), dry_run, exit(), generate_object_name(), i, num_dbs, num_pubs, num_replslots, num_subs, pg_free(), pg_log_error, pg_log_info, pg_prng_seed(), pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQresultErrorMessage(), PQresultStatus(), prng_state, LogicalRepInfo::pubname, LogicalRepInfo::replslotname, res, LogicalRepInfo::subname, and subname.

Referenced by main().

◆ setup_recovery()

static void setup_recovery ( const struct LogicalRepInfo dbinfo,
const char *  datadir,
const char *  lsn 
)
static

Definition at line 1182 of file pg_createsubscriber.c.

1183 {
1184  PGconn *conn;
1186 
1187  /*
1188  * Despite of the recovery parameters will be written to the subscriber,
1189  * use a publisher connection. The primary_conninfo is generated using the
1190  * connection settings.
1191  */
1192  conn = connect_database(dbinfo[0].pubconninfo, true);
1193 
1194  /*
1195  * Write recovery parameters.
1196  *
1197  * The subscriber is not running yet. In dry run mode, the recovery
1198  * parameters *won't* be written. An invalid LSN is used for printing
1199  * purposes. Additional recovery parameters are added here. It avoids
1200  * unexpected behavior such as end of recovery as soon as a consistent
1201  * state is reached (recovery_target) and failure due to multiple recovery
1202  * targets (name, time, xid, LSN).
1203  */
1205  appendPQExpBuffer(recoveryconfcontents, "recovery_target = ''\n");
1207  "recovery_target_timeline = 'latest'\n");
1209  "recovery_target_inclusive = true\n");
1211  "recovery_target_action = promote\n");
1212  appendPQExpBuffer(recoveryconfcontents, "recovery_target_name = ''\n");
1213  appendPQExpBuffer(recoveryconfcontents, "recovery_target_time = ''\n");
1214  appendPQExpBuffer(recoveryconfcontents, "recovery_target_xid = ''\n");
1215 
1216  if (dry_run)
1217  {
1218  appendPQExpBuffer(recoveryconfcontents, "# dry run mode");
1220  "recovery_target_lsn = '%X/%X'\n",
1222  }
1223  else
1224  {
1225  appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n",
1226  lsn);
1228  }
1229  disconnect_database(conn, false);
1230 
1231  pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
1232 }
static PQExpBuffer recoveryconfcontents
void WriteRecoveryConfig(PGconn *pgconn, const char *target_dir, PQExpBuffer contents)
Definition: recovery_gen.c:124
PQExpBuffer GenerateRecoveryConfig(PGconn *pgconn, const char *replication_slot, char *dbname)
Definition: recovery_gen.c:27

References appendPQExpBuffer(), conn, connect_database(), PQExpBufferData::data, datadir, dbinfo, disconnect_database(), dry_run, GenerateRecoveryConfig(), InvalidXLogRecPtr, LSN_FORMAT_ARGS, pg_log_debug, recoveryconfcontents, and WriteRecoveryConfig().

Referenced by main().

◆ setup_subscriber()

static void setup_subscriber ( struct LogicalRepInfo dbinfo,
const char *  consistent_lsn 
)
static

Definition at line 1142 of file pg_createsubscriber.c.

1143 {
1144  for (int i = 0; i < num_dbs; i++)
1145  {
1146  PGconn *conn;
1147 
1148  /* Connect to subscriber. */
1149  conn = connect_database(dbinfo[i].subconninfo, true);
1150 
1151  /*
1152  * We don't need the pre-existing subscriptions on the newly formed
1153  * subscriber. They can connect to other publisher nodes and either
1154  * get some unwarranted data or can lead to ERRORs in connecting to
1155  * such nodes.
1156  */
1158 
1159  /*
1160  * Since the publication was created before the consistent LSN, it is
1161  * available on the subscriber when the physical replica is promoted.
1162  * Remove publications from the subscriber because it has no use.
1163  */
1165 
1167 
1168  /* Set the replication progress to the correct LSN */
1169  set_replication_progress(conn, &dbinfo[i], consistent_lsn);
1170 
1171  /* Enable subscription */
1173 
1174  disconnect_database(conn, false);
1175  }
1176 }
static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void check_and_drop_existing_subscriptions(PGconn *conn, const struct LogicalRepInfo *dbinfo)
static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, const char *lsn)
static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)

References check_and_drop_existing_subscriptions(), conn, connect_database(), create_subscription(), dbinfo, disconnect_database(), drop_publication(), enable_subscription(), i, num_dbs, and set_replication_progress().

Referenced by main().

◆ start_standby_server()

static void start_standby_server ( const struct CreateSubscriberOptions opt,
bool  restricted_access,
bool  restrict_logical_worker 
)
static

Definition at line 1433 of file pg_createsubscriber.c.

1435 {
1436  PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
1437  int rc;
1438 
1439  appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D ", pg_ctl_path);
1440  appendShellString(pg_ctl_cmd, subscriber_dir);
1441  appendPQExpBuffer(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
1442  if (restricted_access)
1443  {
1444  appendPQExpBuffer(pg_ctl_cmd, " -o \"-p %s\"", opt->sub_port);
1445 #if !defined(WIN32)
1446 
1447  /*
1448  * An empty listen_addresses list means the server does not listen on
1449  * any IP interfaces; only Unix-domain sockets can be used to connect
1450  * to the server. Prevent external connections to minimize the chance
1451  * of failure.
1452  */
1453  appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c listen_addresses='' -c unix_socket_permissions=0700");
1454  if (opt->socket_dir)
1455  appendPQExpBuffer(pg_ctl_cmd, " -c unix_socket_directories='%s'",
1456  opt->socket_dir);
1457  appendPQExpBufferChar(pg_ctl_cmd, '"');
1458 #endif
1459  }
1460  if (opt->config_file != NULL)
1461  appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
1462  opt->config_file);
1463 
1464  /* Suppress to start logical replication if requested */
1465  if (restrict_logical_worker)
1466  appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
1467 
1468  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
1469  rc = system(pg_ctl_cmd->data);
1470  pg_ctl_status(pg_ctl_cmd->data, rc);
1471  standby_running = true;
1472  destroyPQExpBuffer(pg_ctl_cmd);
1473  pg_log_info("server was started");
1474 }
static void pg_ctl_status(const char *pg_ctl_cmd, int rc)
void appendShellString(PQExpBuffer buf, const char *str)
Definition: string_utils.c:429

References appendPQExpBuffer(), appendPQExpBufferChar(), appendPQExpBufferStr(), appendShellString(), CreateSubscriberOptions::config_file, createPQExpBuffer(), PQExpBufferData::data, destroyPQExpBuffer(), pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, CreateSubscriberOptions::socket_dir, standby_running, CreateSubscriberOptions::sub_port, and subscriber_dir.

Referenced by main().

◆ stop_standby_server()

static void stop_standby_server ( const char *  datadir)
static

Definition at line 1477 of file pg_createsubscriber.c.

1478 {
1479  char *pg_ctl_cmd;
1480  int rc;
1481 
1482  pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
1483  datadir);
1484  pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
1485  rc = system(pg_ctl_cmd);
1486  pg_ctl_status(pg_ctl_cmd, rc);
1487  standby_running = false;
1488  pg_log_info("server was stopped");
1489 }

References datadir, pg_ctl_path, pg_ctl_status(), pg_log_debug, pg_log_info, psprintf(), and standby_running.

Referenced by cleanup_objects_atexit(), main(), and wait_for_end_recovery().

◆ store_pub_sub_info()

static struct LogicalRepInfo * store_pub_sub_info ( const struct CreateSubscriberOptions opt,
const char *  pub_base_conninfo,
const char *  sub_base_conninfo 
)
static

Definition at line 433 of file pg_createsubscriber.c.

436 {
437  struct LogicalRepInfo *dbinfo;
438  SimpleStringListCell *pubcell = NULL;
439  SimpleStringListCell *subcell = NULL;
440  SimpleStringListCell *replslotcell = NULL;
441  int i = 0;
442 
444 
445  if (num_pubs > 0)
446  pubcell = opt->pub_names.head;
447  if (num_subs > 0)
448  subcell = opt->sub_names.head;
449  if (num_replslots > 0)
450  replslotcell = opt->replslot_names.head;
451 
452  for (SimpleStringListCell *cell = opt->database_names.head; cell; cell = cell->next)
453  {
454  char *conninfo;
455 
456  /* Fill publisher attributes */
457  conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
458  dbinfo[i].pubconninfo = conninfo;
459  dbinfo[i].dbname = cell->val;
460  if (num_pubs > 0)
461  dbinfo[i].pubname = pubcell->val;
462  else
463  dbinfo[i].pubname = NULL;
464  if (num_replslots > 0)
465  dbinfo[i].replslotname = replslotcell->val;
466  else
467  dbinfo[i].replslotname = NULL;
468  dbinfo[i].made_replslot = false;
469  dbinfo[i].made_publication = false;
470  /* Fill subscriber attributes */
471  conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
472  dbinfo[i].subconninfo = conninfo;
473  if (num_subs > 0)
474  dbinfo[i].subname = subcell->val;
475  else
476  dbinfo[i].subname = NULL;
477  /* Other fields will be filled later */
478 
479  pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
480  dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
481  dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
482  dbinfo[i].pubconninfo);
483  pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
484  dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
485  dbinfo[i].subconninfo);
486 
487  if (num_pubs > 0)
488  pubcell = pubcell->next;
489  if (num_subs > 0)
490  subcell = subcell->next;
491  if (num_replslots > 0)
492  replslotcell = replslotcell->next;
493 
494  i++;
495  }
496 
497  return dbinfo;
498 }
#define pg_malloc_array(type, count)
Definition: fe_memutils.h:44
static char * concat_conninfo_dbname(const char *conninfo, const char *dbname)
char val[FLEXIBLE_ARRAY_MEMBER]
Definition: simple_list.h:37
struct SimpleStringListCell * next
Definition: simple_list.h:34

References concat_conninfo_dbname(), CreateSubscriberOptions::database_names, dbinfo, LogicalRepInfo::dbname, SimpleStringList::head, i, LogicalRepInfo::made_publication, LogicalRepInfo::made_replslot, SimpleStringListCell::next, num_dbs, num_pubs, num_replslots, num_subs, pg_log_debug, pg_malloc_array, CreateSubscriberOptions::pub_names, LogicalRepInfo::pubconninfo, LogicalRepInfo::pubname, CreateSubscriberOptions::replslot_names, LogicalRepInfo::replslotname, CreateSubscriberOptions::sub_names, LogicalRepInfo::subconninfo, LogicalRepInfo::subname, subname, and SimpleStringListCell::val.

Referenced by main().

◆ usage()

static void usage ( void  )
static

Definition at line 217 of file pg_createsubscriber.c.

218 {
219  printf(_("%s creates a new logical replica from a standby server.\n\n"),
220  progname);
221  printf(_("Usage:\n"));
222  printf(_(" %s [OPTION]...\n"), progname);
223  printf(_("\nOptions:\n"));
224  printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
225  printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
226  printf(_(" -n, --dry-run dry run, just show what would be done\n"));
227  printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
228  printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
229  printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n"));
230  printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n"));
231  printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n"));
232  printf(_(" -v, --verbose output verbose messages\n"));
233  printf(_(" --config-file=FILENAME use specified main server configuration\n"
234  " file when running target cluster\n"));
235  printf(_(" --publication=NAME publication name\n"));
236  printf(_(" --replication-slot=NAME replication slot name\n"));
237  printf(_(" --subscription=NAME subscription name\n"));
238  printf(_(" -V, --version output version information, then exit\n"));
239  printf(_(" -?, --help show this help, then exit\n"));
240  printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
241  printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
242 }
#define _(x)
Definition: elog.c:90
#define printf(...)
Definition: port.h:244

References _, DEFAULT_SUB_PORT, printf, and progname.

Referenced by main().

◆ wait_for_end_recovery()

static void wait_for_end_recovery ( const char *  conninfo,
const struct CreateSubscriberOptions opt 
)
static

Definition at line 1501 of file pg_createsubscriber.c.

1502 {
1503  PGconn *conn;
1504  int status = POSTMASTER_STILL_STARTING;
1505  int timer = 0;
1506 
1507  pg_log_info("waiting for the target server to reach the consistent state");
1508 
1509  conn = connect_database(conninfo, true);
1510 
1511  for (;;)
1512  {
1513  bool in_recovery = server_is_in_recovery(conn);
1514 
1515  /*
1516  * Does the recovery process finish? In dry run mode, there is no
1517  * recovery mode. Bail out as the recovery process has ended.
1518  */
1519  if (!in_recovery || dry_run)
1520  {
1521  status = POSTMASTER_READY;
1522  recovery_ended = true;
1523  break;
1524  }
1525 
1526  /* Bail out after recovery_timeout seconds if this option is set */
1527  if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
1528  {
1530  pg_log_error("recovery timed out");
1531  disconnect_database(conn, true);
1532  }
1533 
1534  /* Keep waiting */
1536 
1537  timer += WAIT_INTERVAL;
1538  }
1539 
1540  disconnect_database(conn, false);
1541 
1542  if (status == POSTMASTER_STILL_STARTING)
1543  pg_fatal("server did not end recovery");
1544 
1545  pg_log_info("target server reached the consistent state");
1546  pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
1547 }
#define pg_log_info_hint(...)
Definition: logging.h:130
#define WAIT_INTERVAL
#define USEC_PER_SEC
void pg_usleep(long microsec)
Definition: signal.c:53

References conn, connect_database(), disconnect_database(), dry_run, pg_fatal, pg_log_error, pg_log_info, pg_log_info_hint, pg_usleep(), POSTMASTER_READY, POSTMASTER_STILL_STARTING, recovery_ended, CreateSubscriberOptions::recovery_timeout, server_is_in_recovery(), stop_standby_server(), subscriber_dir, USEC_PER_SEC, and WAIT_INTERVAL.

Referenced by main().

Variable Documentation

◆ dbinfo

◆ dry_run

◆ num_dbs

◆ num_pubs

int num_pubs = 0
static

Definition at line 123 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_replslots

int num_replslots = 0
static

Definition at line 125 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ num_subs

int num_subs = 0
static

Definition at line 124 of file pg_createsubscriber.c.

Referenced by main(), setup_publisher(), and store_pub_sub_info().

◆ pg_ctl_path

char* pg_ctl_path = NULL
static

Definition at line 129 of file pg_createsubscriber.c.

Referenced by main(), start_standby_server(), and stop_standby_server().

◆ pg_resetwal_path

char* pg_resetwal_path = NULL
static

Definition at line 130 of file pg_createsubscriber.c.

Referenced by main(), and modify_subscriber_sysid().

◆ primary_slot_name

char* primary_slot_name = NULL
static

Definition at line 116 of file pg_createsubscriber.c.

Referenced by check_subscriber(), drop_primary_replication_slot(), and main().

◆ prng_state

pg_prng_state prng_state
static

Definition at line 127 of file pg_createsubscriber.c.

Referenced by generate_object_name(), and setup_publisher().

◆ progname

const char* progname
static

Definition at line 114 of file pg_createsubscriber.c.

Referenced by get_exec_path(), get_sub_conninfo(), main(), and usage().

◆ recovery_ended

bool recovery_ended = false
static

Definition at line 135 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and wait_for_end_recovery().

◆ standby_running

bool standby_running = false
static

◆ subscriber_dir

char* subscriber_dir = NULL
static

◆ success

bool success = false
static

Definition at line 119 of file pg_createsubscriber.c.

Referenced by cleanup_objects_atexit(), and main().