PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
streamutil.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * streamutil.c - utility functions for pg_basebackup and pg_receivelog
4  *
5  * Author: Magnus Hagander <magnus@hagander.net>
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  *
9  * IDENTIFICATION
10  * src/bin/pg_basebackup/streamutil.c
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres_fe.h"
15 
16 #include <sys/time.h>
17 #include <unistd.h>
18 
19 /* for ntohl/htonl */
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22 
23 /* local includes */
24 #include "receivelog.h"
25 #include "streamutil.h"
26 
27 #include "pqexpbuffer.h"
28 #include "common/fe_memutils.h"
29 #include "datatype/timestamp.h"
30 
31 #define ERRCODE_DUPLICATE_OBJECT "42710"
32 
33 const char *progname;
35 char *dbhost = NULL;
36 char *dbuser = NULL;
37 char *dbport = NULL;
38 char *dbname = NULL;
39 int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
40 static bool have_password = false;
41 static char password[100];
43 
44 /*
45  * Connect to the server. Returns a valid PGconn pointer if connected,
46  * or NULL on non-permanent error. On permanent error, the function will
47  * call exit(1) directly.
48  */
49 PGconn *
51 {
52  PGconn *tmpconn;
53  int argcount = 7; /* dbname, replication, fallback_app_name,
54  * host, user, port, password */
55  int i;
56  const char **keywords;
57  const char **values;
58  const char *tmpparam;
59  bool need_password;
60  PQconninfoOption *conn_opts = NULL;
61  PQconninfoOption *conn_opt;
62  char *err_msg = NULL;
63 
64  /* pg_recvlogical uses dbname only; others use connection_string only. */
66 
67  /*
68  * Merge the connection info inputs given in form of connection string,
69  * options and default values (dbname=replication, replication=true, etc.)
70  * Explicitly discard any dbname value in the connection string;
71  * otherwise, PQconnectdbParams() would interpret that value as being
72  * itself a connection string.
73  */
74  i = 0;
76  {
77  conn_opts = PQconninfoParse(connection_string, &err_msg);
78  if (conn_opts == NULL)
79  {
80  fprintf(stderr, "%s: %s", progname, err_msg);
81  exit(1);
82  }
83 
84  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
85  {
86  if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
87  strcmp(conn_opt->keyword, "dbname") != 0)
88  argcount++;
89  }
90 
91  keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
92  values = pg_malloc0((argcount + 1) * sizeof(*values));
93 
94  for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
95  {
96  if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
97  strcmp(conn_opt->keyword, "dbname") != 0)
98  {
99  keywords[i] = conn_opt->keyword;
100  values[i] = conn_opt->val;
101  i++;
102  }
103  }
104  }
105  else
106  {
107  keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
108  values = pg_malloc0((argcount + 1) * sizeof(*values));
109  }
110 
111  keywords[i] = "dbname";
112  values[i] = dbname == NULL ? "replication" : dbname;
113  i++;
114  keywords[i] = "replication";
115  values[i] = dbname == NULL ? "true" : "database";
116  i++;
117  keywords[i] = "fallback_application_name";
118  values[i] = progname;
119  i++;
120 
121  if (dbhost)
122  {
123  keywords[i] = "host";
124  values[i] = dbhost;
125  i++;
126  }
127  if (dbuser)
128  {
129  keywords[i] = "user";
130  values[i] = dbuser;
131  i++;
132  }
133  if (dbport)
134  {
135  keywords[i] = "port";
136  values[i] = dbport;
137  i++;
138  }
139 
140  /* If -W was given, force prompt for password, but only the first time */
141  need_password = (dbgetpassword == 1 && !have_password);
142 
143  do
144  {
145  /* Get a new password if appropriate */
146  if (need_password)
147  {
148  simple_prompt("Password: ", password, sizeof(password), false);
149  have_password = true;
150  need_password = false;
151  }
152 
153  /* Use (or reuse, on a subsequent connection) password if we have it */
154  if (have_password)
155  {
156  keywords[i] = "password";
157  values[i] = password;
158  }
159  else
160  {
161  keywords[i] = NULL;
162  values[i] = NULL;
163  }
164 
165  tmpconn = PQconnectdbParams(keywords, values, true);
166 
167  /*
168  * If there is too little memory even to allocate the PGconn object
169  * and PQconnectdbParams returns NULL, we call exit(1) directly.
170  */
171  if (!tmpconn)
172  {
173  fprintf(stderr, _("%s: could not connect to server\n"),
174  progname);
175  exit(1);
176  }
177 
178  /* If we need a password and -w wasn't given, loop back and get one */
179  if (PQstatus(tmpconn) == CONNECTION_BAD &&
180  PQconnectionNeedsPassword(tmpconn) &&
181  dbgetpassword != -1)
182  {
183  PQfinish(tmpconn);
184  need_password = true;
185  }
186  }
187  while (need_password);
188 
189  if (PQstatus(tmpconn) != CONNECTION_OK)
190  {
191  fprintf(stderr, _("%s: could not connect to server: %s"),
192  progname, PQerrorMessage(tmpconn));
193  PQfinish(tmpconn);
194  free(values);
195  free(keywords);
196  if (conn_opts)
197  PQconninfoFree(conn_opts);
198  return NULL;
199  }
200 
201  /* Connection ok! */
202  free(values);
203  free(keywords);
204  if (conn_opts)
205  PQconninfoFree(conn_opts);
206 
207  /*
208  * Ensure we have the same value of integer_datetimes (now always "on") as
209  * the server we are connecting to.
210  */
211  tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
212  if (!tmpparam)
213  {
214  fprintf(stderr,
215  _("%s: could not determine server setting for integer_datetimes\n"),
216  progname);
217  PQfinish(tmpconn);
218  exit(1);
219  }
220 
221  if (strcmp(tmpparam, "on") != 0)
222  {
223  fprintf(stderr,
224  _("%s: integer_datetimes compile flag does not match server\n"),
225  progname);
226  PQfinish(tmpconn);
227  exit(1);
228  }
229 
230  return tmpconn;
231 }
232 
233 /*
234  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
235  * some result information if requested:
236  * - System identifier
237  * - Current timeline ID
238  * - Start LSN position
239  * - Database name (NULL in servers prior to 9.4)
240  */
241 bool
242 RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
243  XLogRecPtr *startpos, char **db_name)
244 {
245  PGresult *res;
246  uint32 hi,
247  lo;
248 
249  /* Check connection existence */
250  Assert(conn != NULL);
251 
252  res = PQexec(conn, "IDENTIFY_SYSTEM");
253  if (PQresultStatus(res) != PGRES_TUPLES_OK)
254  {
255  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
256  progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
257 
258  PQclear(res);
259  return false;
260  }
261  if (PQntuples(res) != 1 || PQnfields(res) < 3)
262  {
263  fprintf(stderr,
264  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
265  progname, PQntuples(res), PQnfields(res), 1, 3);
266 
267  PQclear(res);
268  return false;
269  }
270 
271  /* Get system identifier */
272  if (sysid != NULL)
273  *sysid = pg_strdup(PQgetvalue(res, 0, 0));
274 
275  /* Get timeline ID to start streaming from */
276  if (starttli != NULL)
277  *starttli = atoi(PQgetvalue(res, 0, 1));
278 
279  /* Get LSN start position if necessary */
280  if (startpos != NULL)
281  {
282  if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
283  {
284  fprintf(stderr,
285  _("%s: could not parse transaction log location \"%s\"\n"),
286  progname, PQgetvalue(res, 0, 2));
287 
288  PQclear(res);
289  return false;
290  }
291  *startpos = ((uint64) hi) << 32 | lo;
292  }
293 
294  /* Get database name, only available in 9.4 and newer versions */
295  if (db_name != NULL)
296  {
297  *db_name = NULL;
298  if (PQserverVersion(conn) >= 90400)
299  {
300  if (PQnfields(res) < 4)
301  {
302  fprintf(stderr,
303  _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields\n"),
304  progname, PQntuples(res), PQnfields(res), 1, 4);
305 
306  PQclear(res);
307  return false;
308  }
309  if (!PQgetisnull(res, 0, 3))
310  *db_name = pg_strdup(PQgetvalue(res, 0, 3));
311  }
312  }
313 
314  PQclear(res);
315  return true;
316 }
317 
318 /*
319  * Create a replication slot for the given connection. This function
320  * returns true in case of success.
321  */
322 bool
323 CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
324  bool is_physical, bool slot_exists_ok)
325 {
326  PQExpBuffer query;
327  PGresult *res;
328 
329  query = createPQExpBuffer();
330 
331  Assert((is_physical && plugin == NULL) ||
332  (!is_physical && plugin != NULL));
333  Assert(slot_name != NULL);
334 
335  /* Build query */
336  if (is_physical)
337  appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL",
338  slot_name);
339  else
340  {
341  appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
342  slot_name, plugin);
343  if (PQserverVersion(conn) >= 100000)
344  /* pg_recvlogical doesn't use an exported snapshot, so suppress */
345  appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT");
346  }
347 
348  res = PQexec(conn, query->data);
349  if (PQresultStatus(res) != PGRES_TUPLES_OK)
350  {
351  const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
352 
353  if (slot_exists_ok &&
354  sqlstate &&
355  strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
356  {
357  destroyPQExpBuffer(query);
358  PQclear(res);
359  return true;
360  }
361  else
362  {
363  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
364  progname, query->data, PQerrorMessage(conn));
365 
366  destroyPQExpBuffer(query);
367  PQclear(res);
368  return false;
369  }
370  }
371 
372  if (PQntuples(res) != 1 || PQnfields(res) != 4)
373  {
374  fprintf(stderr,
375  _("%s: could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
376  progname, slot_name,
377  PQntuples(res), PQnfields(res), 1, 4);
378 
379  destroyPQExpBuffer(query);
380  PQclear(res);
381  return false;
382  }
383 
384  destroyPQExpBuffer(query);
385  PQclear(res);
386  return true;
387 }
388 
389 /*
390  * Drop a replication slot for the given connection. This function
391  * returns true in case of success.
392  */
393 bool
394 DropReplicationSlot(PGconn *conn, const char *slot_name)
395 {
396  PQExpBuffer query;
397  PGresult *res;
398 
399  Assert(slot_name != NULL);
400 
401  query = createPQExpBuffer();
402 
403  /* Build query */
404  appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
405  slot_name);
406  res = PQexec(conn, query->data);
407  if (PQresultStatus(res) != PGRES_COMMAND_OK)
408  {
409  fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
410  progname, query->data, PQerrorMessage(conn));
411 
412  destroyPQExpBuffer(query);
413  PQclear(res);
414  return false;
415  }
416 
417  if (PQntuples(res) != 0 || PQnfields(res) != 0)
418  {
419  fprintf(stderr,
420  _("%s: could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields\n"),
421  progname, slot_name,
422  PQntuples(res), PQnfields(res), 0, 0);
423 
424  destroyPQExpBuffer(query);
425  PQclear(res);
426  return false;
427  }
428 
429  destroyPQExpBuffer(query);
430  PQclear(res);
431  return true;
432 }
433 
434 
435 /*
436  * Frontend version of GetCurrentTimestamp(), since we are not linked with
437  * backend code.
438  */
441 {
443  struct timeval tp;
444 
445  gettimeofday(&tp, NULL);
446 
447  result = (TimestampTz) tp.tv_sec -
449  result = (result * USECS_PER_SEC) + tp.tv_usec;
450 
451  return result;
452 }
453 
454 /*
455  * Frontend version of TimestampDifference(), since we are not linked with
456  * backend code.
457  */
458 void
460  long *secs, int *microsecs)
461 {
462  TimestampTz diff = stop_time - start_time;
463 
464  if (diff <= 0)
465  {
466  *secs = 0;
467  *microsecs = 0;
468  }
469  else
470  {
471  *secs = (long) (diff / USECS_PER_SEC);
472  *microsecs = (int) (diff % USECS_PER_SEC);
473  }
474 }
475 
476 /*
477  * Frontend version of TimestampDifferenceExceeds(), since we are not
478  * linked with backend code.
479  */
480 bool
482  TimestampTz stop_time,
483  int msec)
484 {
485  TimestampTz diff = stop_time - start_time;
486 
487  return (diff >= msec * INT64CONST(1000));
488 }
489 
490 /*
491  * Converts an int64 to network byte order.
492  */
493 void
494 fe_sendint64(int64 i, char *buf)
495 {
496  uint32 n32;
497 
498  /* High order half first, since we're doing MSB-first */
499  n32 = (uint32) (i >> 32);
500  n32 = htonl(n32);
501  memcpy(&buf[0], &n32, 4);
502 
503  /* Now the low order half */
504  n32 = (uint32) i;
505  n32 = htonl(n32);
506  memcpy(&buf[4], &n32, 4);
507 }
508 
509 /*
510  * Converts an int64 from network byte order to native format.
511  */
512 int64
514 {
515  int64 result;
516  uint32 h32;
517  uint32 l32;
518 
519  memcpy(&h32, buf, 4);
520  memcpy(&l32, buf + 4, 4);
521  h32 = ntohl(h32);
522  l32 = ntohl(l32);
523 
524  result = h32;
525  result <<= 32;
526  result |= l32;
527 
528  return result;
529 }
static char password[100]
Definition: streamutil.c:41
static const char * plugin
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2681
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:5934
int gettimeofday(struct timeval *tp, struct timezone *tzp)
Definition: gettimeofday.c:105
uint32 TimeLineID
Definition: xlogdefs.h:45
TimestampTz feGetCurrentTimestamp(void)
Definition: streamutil.c:440
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3067
const char * PQparameterStatus(const PGconn *conn, const char *paramName)
Definition: fe-connect.c:5899
#define USECS_PER_SEC
Definition: timestamp.h:94
int64 TimestampTz
Definition: timestamp.h:39
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
Definition: streamutil.c:242
void PQfinish(PGconn *conn)
Definition: fe-connect.c:3491
return result
Definition: formatting.c:1618
int PQserverVersion(const PGconn *conn)
Definition: fe-connect.c:5924
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2673
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: streamutil.c:481
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2596
bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, bool is_physical, bool slot_exists_ok)
Definition: streamutil.c:323
PGconn * PQconnectdbParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:466
static time_t start_time
Definition: pg_ctl.c:91
static bool slot_exists_ok
Definition: pg_receivewal.c:41
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:113
PQconninfoOption * PQconninfoParse(const char *conninfo, char **errmsg)
Definition: fe-connect.c:4578
char * connection_string
Definition: streamutil.c:34
#define SECS_PER_DAY
Definition: timestamp.h:86
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
PGconn * conn
Definition: streamutil.c:42
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:262
void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: streamutil.c:459
static char * buf
Definition: pg_test_fsync.c:66
void simple_prompt(const char *prompt, char *destination, size_t destlen, bool echo)
Definition: sprompt.c:37
void PQconninfoFree(PQconninfoOption *connOptions)
Definition: fe-connect.c:5784
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
unsigned int uint32
Definition: c.h:268
int dbgetpassword
Definition: streamutil.c:39
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:71
char * dbport
Definition: streamutil.c:37
void PQclear(PGresult *res)
Definition: fe-exec.c:650
#define free(a)
Definition: header.h:65
static bool have_password
Definition: streamutil.c:40
const char * progname
Definition: streamutil.c:33
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:2658
#define NULL
Definition: c.h:229
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define Assert(condition)
Definition: c.h:675
#define INT64CONST(x)
Definition: c.h:310
char * dbhost
Definition: streamutil.c:35
char * dbname
Definition: streamutil.c:38
static XLogRecPtr startpos
static Datum values[MAXATTR]
Definition: bootstrap.c:163
int PQconnectionNeedsPassword(const PGconn *conn)
Definition: fe-connect.c:5968
int64 fe_recvint64(char *buf)
Definition: streamutil.c:513
char * dbuser
Definition: streamutil.c:36
int i
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1846
bool DropReplicationSlot(PGconn *conn, const char *slot_name)
Definition: streamutil.c:394
#define UNIX_EPOCH_JDATE
Definition: timestamp.h:162
#define POSTGRES_EPOCH_JDATE
Definition: timestamp.h:163
void fe_sendint64(int64 i, char *buf)
Definition: streamutil.c:494
PGconn * GetConnection(void)
Definition: streamutil.c:50
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3092
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:5881
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:31
#define _(x)
Definition: elog.c:84