32 #define ERRCODE_DUPLICATE_OBJECT "42710"
40 #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
45 #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
69 const char **keywords;
88 if (conn_opts == NULL)
91 for (conn_opt = conn_opts; conn_opt->
keyword != NULL; conn_opt++)
93 if (conn_opt->
val != NULL && conn_opt->
val[0] !=
'\0')
97 keywords =
pg_malloc0((argcount + 1) *
sizeof(*keywords));
104 keywords[
i] =
"dbname";
108 for (conn_opt = conn_opts; conn_opt->
keyword != NULL; conn_opt++)
110 if (conn_opt->
val != NULL && conn_opt->
val[0] !=
'\0')
120 keywords =
pg_malloc0((argcount + 1) *
sizeof(*keywords));
122 keywords[
i] =
"dbname";
127 keywords[
i] =
"replication";
130 keywords[
i] =
"fallback_application_name";
136 keywords[
i] =
"host";
142 keywords[
i] =
"user";
148 keywords[
i] =
"port";
163 need_password =
false;
169 keywords[
i] =
"password";
189 pg_fatal(
"could not connect to server");
197 need_password =
true;
200 while (need_password);
246 pg_log_error(
"could not determine server setting for \"integer_datetimes\"");
251 if (strcmp(tmpparam,
"on") != 0)
253 pg_log_error(
"\"integer_datetimes\" compile flag does not match server");
283 for (conn_opt = conn_opts; conn_opt->
keyword != NULL; conn_opt++)
285 if (strcmp(conn_opt->
keyword,
"dbname") == 0 &&
286 conn_opt->
val != NULL && conn_opt->
val[0] !=
'\0')
308 char *err_msg = NULL;
315 if (conn_opts == NULL)
330 if (conn_opts == NULL)
364 pg_log_error(
"could not send replication command \"%s\": %s",
372 pg_log_error(
"could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
380 if (sscanf(
PQgetvalue(
res, 0, 0),
"%d%2s", &xlog_val, xlog_unit) != 2)
390 if (strcmp(xlog_unit,
"MB") == 0)
391 multiplier = 1024 * 1024;
392 else if (strcmp(xlog_unit,
"GB") == 0)
393 multiplier = 1024 * 1024 * 1024;
401 "remote server reported invalid WAL segment size (%d bytes)",
438 pg_log_error(
"could not send replication command \"%s\": %s",
446 pg_log_error(
"could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
455 pg_log_error(
"group access flag could not be parsed: %s",
490 pg_log_error(
"could not send replication command \"%s\": %s",
498 pg_log_error(
"could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
510 if (starttli != NULL)
518 pg_log_error(
"could not parse write-ahead log location \"%s\"",
524 *
startpos = ((uint64) hi) << 32 | lo;
535 pg_log_error(
"could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
567 *restart_lsn = lsn_loc;
569 *restart_tli = tli_loc;
578 pg_log_error(
"could not send replication command \"%s\": %s",
587 pg_log_error(
"could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
599 pg_log_error(
"replication slot \"%s\" does not exist", slot_name);
610 pg_log_error(
"expected a physical replication slot, got type \"%s\" instead",
624 pg_log_error(
"could not parse restart_lsn \"%s\" for replication slot \"%s\"",
629 lsn_loc = ((uint64) hi) << 32 | lo;
640 *restart_lsn = lsn_loc;
642 *restart_tli = tli_loc;
653 bool is_temporary,
bool is_physical,
bool reserve_wal,
663 (!is_physical &&
plugin != NULL));
665 Assert(slot_name != NULL);
677 if (use_new_option_syntax)
694 if (use_new_option_syntax)
696 "SNAPSHOT",
"nothing");
699 "NOEXPORT_SNAPSHOT");
702 if (use_new_option_syntax)
705 if (query->
data[query->
len - 1] ==
'(')
708 query->
data[query->
len] =
'\0';
730 pg_log_error(
"could not send replication command \"%s\": %s",
741 pg_log_error(
"could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
765 Assert(slot_name != NULL);
775 pg_log_error(
"could not send replication command \"%s\": %s",
785 pg_log_error(
"could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
812 if (
buf->len > 0 &&
buf->data[
buf->len - 1] !=
'(')
814 if (use_new_option_syntax)
831 char *option_name,
char *option_value)
835 if (option_value != NULL)
837 size_t length = strlen(option_value);
838 char *escaped_value =
palloc(1 + 2 * length);
842 pfree(escaped_value);
854 char *option_name,
int32 option_value)
886 long *secs,
int *microsecs)
913 return (diff >= msec * INT64CONST(1000));
924 memcpy(
buf, &n64,
sizeof(n64));
935 memcpy(&n64,
buf,
sizeof(n64));
static Datum values[MAXATTR]
#define ngettext(s, p, n)
#define Assert(condition)
#define ALWAYS_SECURE_SEARCH_PATH_SQL
#define POSTGRES_EPOCH_JDATE
static void PGresult * res
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
int PQserverVersion(const PGconn *conn)
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
PQconninfoOption * PQconndefaults(void)
void PQconninfoFree(PQconninfoOption *connOptions)
int PQconnectionNeedsPassword(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
void PQfinish(PGconn *conn)
size_t PQescapeStringConn(PGconn *conn, char *to, const char *from, size_t length, int *error)
ExecStatusType PQresultStatus(const PGresult *res)
int PQntuples(const PGresult *res)
PGresult * PQexec(PGconn *conn, const char *query)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
char * PQresultErrorField(const PGresult *res, int fieldcode)
int PQnfields(const PGresult *res)
void * pg_malloc0(size_t size)
char * pg_strdup(const char *in)
void SetDataDirectoryCreatePerm(int dataDirMode)
#define pg_log_error(...)
#define pg_log_error_detail(...)
void pfree(void *pointer)
#define DEFAULT_XLOG_SEG_SIZE
static bool slot_exists_ok
static const char * plugin
static XLogRecPtr startpos
PQExpBuffer createPQExpBuffer(void)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferChar(PQExpBuffer str, char ch)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
char * simple_prompt(const char *prompt, bool echo)
void AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name, int32 option_value)
bool RetrieveWalSegSize(PGconn *conn)
#define ERRCODE_DUPLICATE_OBJECT
#define MINIMUM_VERSION_FOR_SHOW_CMD
int64 fe_recvint64(char *buf)
char * GetDbnameFromConnectionOptions(void)
bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_temporary, bool is_physical, bool reserve_wal, bool slot_exists_ok, bool two_phase)
TimestampTz feGetCurrentTimestamp(void)
#define MINIMUM_VERSION_FOR_GROUP_ACCESS
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
void AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name)
void AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax, char *option_name, char *option_value)
void fe_sendint64(int64 i, char *buf)
static char * FindDbnameInConnParams(PQconninfoOption *conn_opts)
PGconn * GetConnection(void)
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
bool GetSlotInformation(PGconn *conn, const char *slot_name, XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
static bool RetrieveDataDirCreatePerm(PGconn *conn)
bool DropReplicationSlot(PGconn *conn, const char *slot_name)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
int gettimeofday(struct timeval *tp, void *tzp)
#define IsValidWalSegSize(size)
#define InvalidXLogRecPtr