PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
streamutil.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
4  * pg_recvlogical
5  *
6  * Author: Magnus Hagander <magnus@hagander.net>
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/streamutil.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/time.h>
18 #include <unistd.h>
19 
20 /* for ntohl/htonl */
21 #include <netinet/in.h>
22 #include <arpa/inet.h>
23 
24 /* local includes */
25 #include "receivelog.h"
26 #include "streamutil.h"
27 
28 #include "pqexpbuffer.h"
29 #include "common/fe_memutils.h"
30 #include "datatype/timestamp.h"
31 
32 #define ERRCODE_DUPLICATE_OBJECT "42710"
33 
34 const char *progname;
36 char *dbhost = NULL;
37 char *dbuser = NULL;
38 char *dbport = NULL;
39 char *dbname = NULL;
40 int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
41 static bool have_password = false;
42 static char password[100];
44 
45 /*
46  * Connect to the server. Returns a valid PGconn pointer if connected,
47  * or NULL on non-permanent error. On permanent error, the function will
48  * call exit(1) directly.
49  */
50 PGconn *
52 {
53  PGconn *tmpconn;
54  int argcount = 7; /* dbname, replication, fallback_app_name,
55  * host, user, port, password */
56  int i;
57  const char **keywords;
58  const char **values;
59  const char *tmpparam;
60  bool need_password;
61  PQconninfoOption *conn_opts = NULL;
62  PQconninfoOption *conn_opt;
63  char *err_msg = NULL;
64 
65  /* pg_recvlogical uses dbname only; others use connection_string only. */
67 
68  /*
69  * Merge the connection info inputs given in form of connection string,
70  * options and default values (dbname=replication, replication=true, etc.)
71  * Explicitly discard any dbname value in the connection string;
72  * otherwise, PQconnectdbParams() would interpret that value as being
73  * itself a connection string.
74  */
75  i = 0;
77  {
78  conn_opts = PQconninfoParse(connection_string, &err_msg);
79  if (conn_opts == NULL)
80  {
81  fprintf(stderr, "%s: %s", progname, err_msg);
82  exit(1);
83  }
84 
85  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
86  {
87  if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
88  strcmp(conn_opt->keyword, "dbname") != 0)
89  argcount++;
90  }
91 
92  keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
93  values = pg_malloc0((argcount + 1) * sizeof(*values));
94 
95  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
96  {
97  if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
98  strcmp(conn_opt->keyword, "dbname") != 0)
99  {
100  keywords[i] = conn_opt->keyword;
101  values[i] = conn_opt->val;
102  i++;
103  }
104  }
105  }
106  else
107  {
108  keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
109  values = pg_malloc0((argcount + 1) * sizeof(*values));
110  }
111 
112  keywords[i] = "dbname";
113  values[i] = dbname == NULL ? "replication" : dbname;
114  i++;
115  keywords[i] = "replication";
116  values[i] = dbname == NULL ? "true" : "database";
117  i++;
118  keywords[i] = "fallback_application_name";
119  values[i] = progname;
120  i++;
121 
122  if (dbhost)
123  {
124  keywords[i] = "host";
125  values[i] = dbhost;
126  i++;
127  }
128  if (dbuser)
129  {
130  keywords[i] = "user";
131  values[i] = dbuser;
132  i++;
133  }
134  if (dbport)
135  {
136  keywords[i] = "port";
137  values[i] = dbport;
138  i++;
139  }
140 
141  /* If -W was given, force prompt for password, but only the first time */
142  need_password = (dbgetpassword == 1 && !have_password);
143 
144  do
145  {
146  /* Get a new password if appropriate */
147  if (need_password)
148  {
149  simple_prompt("Password: ", password, sizeof(password), false);
150  have_password = true;
151  need_password = false;
152  }
153 
154  /* Use (or reuse, on a subsequent connection) password if we have it */
155  if (have_password)
156  {
157  keywords[i] = "password";
158  values[i] = password;
159  }
160  else
161  {
162  keywords[i] = NULL;
163  values[i] = NULL;
164  }
165 
166  tmpconn = PQconnectdbParams(keywords, values, true);
167 
168  /*
169  * If there is too little memory even to allocate the PGconn object
170  * and PQconnectdbParams returns NULL, we call exit(1) directly.
171  */
172  if (!tmpconn)
173  {
174  fprintf(stderr, _("%s: could not connect to server\n"),
175  progname);
176  exit(1);
177  }
178 
179  /* If we need a password and -w wasn't given, loop back and get one */
180  if (PQstatus(tmpconn) == CONNECTION_BAD &&
181  PQconnectionNeedsPassword(tmpconn) &&
182  dbgetpassword != -1)
183  {
184  PQfinish(tmpconn);
185  need_password = true;
186  }
187  }
188  while (need_password);
189 
190  if (PQstatus(tmpconn) != CONNECTION_OK)
191  {
192  fprintf(stderr, _("%s: could not connect to server: %s"),
193  progname, PQerrorMessage(tmpconn));
194  PQfinish(tmpconn);
195  free(values);
196  free(keywords);
197  if (conn_opts)
198  PQconninfoFree(conn_opts);
199  return NULL;
200  }
201 
202  /* Connection ok! */
203  free(values);
204  free(keywords);
205  if (conn_opts)
206  PQconninfoFree(conn_opts);
207 
208  /*
209  * Ensure we have the same value of integer_datetimes (now always "on") as
210  * the server we are connecting to.
211  */
212  tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
213  if (!tmpparam)
214  {
215  fprintf(stderr,
216  _("%s: could not determine server setting for integer_datetimes\n"),
217  progname);
218  PQfinish(tmpconn);
219  exit(1);
220  }
221 
222  if (strcmp(tmpparam, "on") != 0)
223  {
224  fprintf(stderr,
225  _("%s: integer_datetimes compile flag does not match server\n"),
226  progname);
227  PQfinish(tmpconn);
228  exit(1);
229  }
230 
231  return tmpconn;
232 }
233 
234 /*
235  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
236  * some result information if requested:
237  * - System identifier
238  * - Current timeline ID
239  * - Start LSN position
240  * - Database name (NULL in servers prior to 9.4)
241  */
242 bool
243 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
244  XLogRecPtr *startpos, char **db_name)
245 {
246  PGresult *res;
247  uint32 hi,
248  lo;
249 
250  /* Check connection existence */
251  Assert(conn != NULL);
252 
253  res = PQexec(conn, "IDENTIFY_SYSTEM");
254  if (PQresultStatus(res) != PGRES_TUPLES_OK)
255  {
256  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
257  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
258 
259  PQclear(res);
260  return false;
261  }
262  if (PQntuples(res) != 1 || PQnfields(res) < 3)
263  {
264  fprintf(stderr,
265  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
266  progname, PQntuples(res), PQnfields(res), 1, 3);
267 
268  PQclear(res);
269  return false;
270  }
271 
272  /* Get system identifier */
273  if (sysid != NULL)
274  *sysid = pg_strdup(PQgetvalue(res, 0, 0));
275 
276  /* Get timeline ID to start streaming from */
277  if (starttli != NULL)
278  *starttli = atoi(PQgetvalue(res, 0, 1));
279 
280  /* Get LSN start position if necessary */
281  if (startpos != NULL)
282  {
283  if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
284  {
285  fprintf(stderr,
286  _("%s: could not parse write-ahead log location \"%s\"\n"),
287  progname, PQgetvalue(res, 0, 2));
288 
289  PQclear(res);
290  return false;
291  }
292  *startpos = ((uint64) hi) << 32 | lo;
293  }
294 
295  /* Get database name, only available in 9.4 and newer versions */
296  if (db_name != NULL)
297  {
298  *db_name = NULL;
299  if (PQserverVersion(conn) >= 90400)
300  {
301  if (PQnfields(res) < 4)
302  {
303  fprintf(stderr,
304  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
305  progname, PQntuples(res), PQnfields(res), 1, 4);
306 
307  PQclear(res);
308  return false;
309  }
310  if (!PQgetisnull(res, 0, 3))
311  *db_name = pg_strdup(PQgetvalue(res, 0, 3));
312  }
313  }
314 
315  PQclear(res);
316  return true;
317 }
318 
319 /*
320  * Create a replication slot for the given connection. This function
321  * returns true in case of success.
322  */
323 bool
324 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
325  bool is_physical, bool slot_exists_ok)
326 {
327  PQExpBuffer query;
328  PGresult *res;
329 
330  query = createPQExpBuffer();
331 
332  Assert((is_physical && plugin == NULL) ||
333  (!is_physical && plugin != NULL));
334  Assert(slot_name != NULL);
335 
336  /* Build query */
337  if (is_physical)
338  appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
339  slot_name);
340  else
341  {
342  appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
343  slot_name, plugin);
344  if (PQserverVersion(conn) >= 100000)
345  /* pg_recvlogical doesn't use an exported snapshot, so suppress */
346  appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
347  }
348 
349  res = PQexec(conn, query->data);
350  if (PQresultStatus(res) != PGRES_TUPLES_OK)
351  {
352  const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
353 
354  if (slot_exists_ok &&
355  sqlstate &&
356  strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
357  {
358  destroyPQExpBuffer(query);
359  PQclear(res);
360  return true;
361  }
362  else
363  {
364  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
365  progname, query->data, PQerrorMessage(conn));
366 
367  destroyPQExpBuffer(query);
368  PQclear(res);
369  return false;
370  }
371  }
372 
373  if (PQntuples(res) != 1 || PQnfields(res) != 4)
374  {
375  fprintf(stderr,
376  _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
377  progname, slot_name,
378  PQntuples(res), PQnfields(res), 1, 4);
379 
380  destroyPQExpBuffer(query);
381  PQclear(res);
382  return false;
383  }
384 
385  destroyPQExpBuffer(query);
386  PQclear(res);
387  return true;
388 }
389 
390 /*
391  * Drop a replication slot for the given connection. This function
392  * returns true in case of success.
393  */
394 bool
395 DropReplicationSlot(PGconn *conn, const char *slot_name)
396 {
397  PQExpBuffer query;
398  PGresult *res;
399 
400  Assert(slot_name != NULL);
401 
402  query = createPQExpBuffer();
403 
404  /* Build query */
405  appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
406  slot_name);
407  res = PQexec(conn, query->data);
408  if (PQresultStatus(res) != PGRES_COMMAND_OK)
409  {
410  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
411  progname, query->data, PQerrorMessage(conn));
412 
413  destroyPQExpBuffer(query);
414  PQclear(res);
415  return false;
416  }
417 
418  if (PQntuples(res) != 0 || PQnfields(res) != 0)
419  {
420  fprintf(stderr,
421  _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
422  progname, slot_name,
423  PQntuples(res), PQnfields(res), 0, 0);
424 
425  destroyPQExpBuffer(query);
426  PQclear(res);
427  return false;
428  }
429 
430  destroyPQExpBuffer(query);
431  PQclear(res);
432  return true;
433 }
434 
435 
436 /*
437  * Frontend version of GetCurrentTimestamp(), since we are not linked with
438  * backend code.
439  */
442 {
444  struct timeval tp;
445 
446  gettimeofday(&tp, NULL);
447 
448  result = (TimestampTz) tp.tv_sec -
450  result = (result * USECS_PER_SEC) + tp.tv_usec;
451 
452  return result;
453 }
454 
455 /*
456  * Frontend version of TimestampDifference(), since we are not linked with
457  * backend code.
458  */
459 void
461  long *secs, int *microsecs)
462 {
463  TimestampTz diff = stop_time - start_time;
464 
465  if (diff <= 0)
466  {
467  *secs = 0;
468  *microsecs = 0;
469  }
470  else
471  {
472  *secs = (long) (diff / USECS_PER_SEC);
473  *microsecs = (int) (diff % USECS_PER_SEC);
474  }
475 }
476 
477 /*
478  * Frontend version of TimestampDifferenceExceeds(), since we are not
479  * linked with backend code.
480  */
481 bool
483  TimestampTz stop_time,
484  int msec)
485 {
486  TimestampTz diff = stop_time - start_time;
487 
488  return (diff >= msec * INT64CONST(1000));
489 }
490 
491 /*
492  * Converts an int64 to network byte order.
493  */
494 void
495 fe_sendint64(int64 i, char *buf)
496 {
497  uint32 n32;
498 
499  /* High order half first, since we're doing MSB-first */
500  n32 = (uint32) (i >> 32);
501  n32 = htonl(n32);
502  memcpy(&buf[0], &n32, 4);
503 
504  /* Now the low order half */
505  n32 = (uint32) i;
506  n32 = htonl(n32);
507  memcpy(&buf[4], &n32, 4);
508 }
509 
510 /*
511  * Converts an int64 from network byte order to native format.
512  */
513 int64
515 {
516  int64 result;
517  uint32 h32;
518  uint32 l32;
519 
520  memcpy(&h32, buf, 4);
521  memcpy(&l32, buf + 4, 4);
522  h32 = ntohl(h32);
523  l32 = ntohl(l32);
524 
525  result = h32;
526  result <<= 32;
527  result |= l32;
528 
529  return result;
530 }
static char password[100]
Definition: streamutil.c:42
static const char * plugin
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6097
int gettimeofday(struct timeval *tp, struct timezone *tzp)
Definition: gettimeofday.c:105
uint32 TimeLineID
Definition: xlogdefs.h:45
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:441
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:6062
#define USECS_PER_SEC
Definition: timestamp.h:94
int64 TimestampTz
Definition: timestamp.h:39
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:243
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3630
return result
Definition: formatting.c:1633
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:6087
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:482
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_physical, bool slot_exists_ok)
Definition: streamutil.c:324
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:529
static time_t start_time
Definition: pg_ctl.c:103
static bool slot_exists_ok
Definition: pg_receivewal.c:41
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:113
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:4717
char * connection_string
Definition: streamutil.c:35
#define SECS_PER_DAY
Definition: timestamp.h:86
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
PGconn * conn
Definition: streamutil.c:43
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:262
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:460
static char * buf
Definition: pg_test_fsync.c:66
void simple_prompt(const char *prompt, char *destination, size_t destlen, bool echo)
Definition: sprompt.c:37
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:5947
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
unsigned int uint32
Definition: c.h:268
int dbgetpassword
Definition: streamutil.c:40
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:71
char * dbport
Definition: streamutil.c:38
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define free(a)
Definition: header.h:65
static bool have_password
Definition: streamutil.c:41
const char * progname
Definition: streamutil.c:34
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2658
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
#define INT64CONST(x)
Definition: c.h:310
char * dbhost
Definition: streamutil.c:36
char * dbname
Definition: streamutil.c:39
static XLogRecPtr startpos
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int PQconnectionNeedsPassword(const PGconn *conn)
Definition: fe-connect.c:6131
int64 fe_recvint64(char *buf)
Definition: streamutil.c:514
char * dbuser
Definition: streamutil.c:37
int i
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
bool DropReplicationSlot(PGconn *conn, const char *slot_name)
Definition: streamutil.c:395
#define UNIX_EPOCH_JDATE
Definition: timestamp.h:162
#define POSTGRES_EPOCH_JDATE
Definition: timestamp.h:163
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:495
PGconn * GetConnection(void)
Definition: streamutil.c:51
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3092
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6044
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:32
#define _(x)
Definition: elog.c:84