PostgreSQL Source Code  git master
libpq_fetch.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_fetch.c
4  * Functions for fetching files from a remote server.
5  *
6  * Copyright (c) 2013-2019, PostgreSQL Global Development Group
7  *
8  *-------------------------------------------------------------------------
9  */
10 #include "postgres_fe.h"
11 
12 #include <sys/stat.h>
13 #include <dirent.h>
14 #include <fcntl.h>
15 #include <unistd.h>
16 
17 #include "pg_rewind.h"
18 #include "datapagemap.h"
19 #include "fetch.h"
20 #include "file_ops.h"
21 #include "filemap.h"
22 
23 #include "libpq-fe.h"
24 #include "catalog/pg_type_d.h"
25 #include "fe_utils/connect.h"
26 #include "port/pg_bswap.h"
27 
28 static PGconn *conn = NULL;
29 
30 /*
31  * Files are fetched max CHUNKSIZE bytes at a time.
32  *
33  * (This only applies to files that are copied in whole, or for truncated
34  * files where we copy the tail. Relation files, where we know the individual
35  * blocks that need to be fetched, are fetched in BLCKSZ chunks.)
36  */
37 #define CHUNKSIZE 1000000
38 
39 static void receiveFileChunks(const char *sql);
40 static void execute_pagemap(datapagemap_t *pagemap, const char *path);
41 static char *run_simple_query(const char *sql);
42 static void run_simple_command(const char *sql);
43 
44 void
45 libpqConnect(const char *connstr)
46 {
47  char *str;
48  PGresult *res;
49 
50  conn = PQconnectdb(connstr);
51  if (PQstatus(conn) == CONNECTION_BAD)
52  pg_fatal("could not connect to server: %s",
53  PQerrorMessage(conn));
54 
55  if (showprogress)
56  pg_log_info("connected to server");
57 
58  /* disable all types of timeouts */
59  run_simple_command("SET statement_timeout = 0");
60  run_simple_command("SET lock_timeout = 0");
61  run_simple_command("SET idle_in_transaction_session_timeout = 0");
62 
64  if (PQresultStatus(res) != PGRES_TUPLES_OK)
65  pg_fatal("could not clear search_path: %s",
67  PQclear(res);
68 
69  /*
70  * Check that the server is not in hot standby mode. There is no
71  * fundamental reason that couldn't be made to work, but it doesn't
72  * currently because we use a temporary table. Better to check for it
73  * explicitly than error out, for a better error message.
74  */
75  str = run_simple_query("SELECT pg_is_in_recovery()");
76  if (strcmp(str, "f") != 0)
77  pg_fatal("source server must not be in recovery mode");
78  pg_free(str);
79 
80  /*
81  * Also check that full_page_writes is enabled. We can get torn pages if
82  * a page is modified while we read it with pg_read_binary_file(), and we
83  * rely on full page images to fix them.
84  */
85  str = run_simple_query("SHOW full_page_writes");
86  if (strcmp(str, "on") != 0)
87  pg_fatal("full_page_writes must be enabled in the source server");
88  pg_free(str);
89 
90  /*
91  * Although we don't do any "real" updates, we do work with a temporary
92  * table. We don't care about synchronous commit for that. It doesn't
93  * otherwise matter much, but if the server is using synchronous
94  * replication, and replication isn't working for some reason, we don't
95  * want to get stuck, waiting for it to start working again.
96  */
97  run_simple_command("SET synchronous_commit = off");
98 }
99 
100 /*
101  * Runs a query that returns a single value.
102  * The result should be pg_free'd after use.
103  */
104 static char *
105 run_simple_query(const char *sql)
106 {
107  PGresult *res;
108  char *result;
109 
110  res = PQexec(conn, sql);
111 
112  if (PQresultStatus(res) != PGRES_TUPLES_OK)
113  pg_fatal("error running query (%s) in source server: %s",
114  sql, PQresultErrorMessage(res));
115 
116  /* sanity check the result set */
117  if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
118  pg_fatal("unexpected result set from query");
119 
120  result = pg_strdup(PQgetvalue(res, 0, 0));
121 
122  PQclear(res);
123 
124  return result;
125 }
126 
127 /*
128  * Runs a command.
129  * In the event of a failure, exit immediately.
130  */
131 static void
132 run_simple_command(const char *sql)
133 {
134  PGresult *res;
135 
136  res = PQexec(conn, sql);
137 
138  if (PQresultStatus(res) != PGRES_COMMAND_OK)
139  pg_fatal("error running query (%s) in source server: %s",
140  sql, PQresultErrorMessage(res));
141 
142  PQclear(res);
143 }
144 
145 /*
146  * Calls pg_current_wal_insert_lsn() function
147  */
150 {
151  XLogRecPtr result;
152  uint32 hi;
153  uint32 lo;
154  char *val;
155 
156  val = run_simple_query("SELECT pg_current_wal_insert_lsn()");
157 
158  if (sscanf(val, "%X/%X", &hi, &lo) != 2)
159  pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
160 
161  result = ((uint64) hi) << 32 | lo;
162 
163  pg_free(val);
164 
165  return result;
166 }
167 
168 /*
169  * Get a list of all files in the data directory.
170  */
171 void
173 {
174  PGresult *res;
175  const char *sql;
176  int i;
177 
178  /*
179  * Create a recursive directory listing of the whole data directory.
180  *
181  * The WITH RECURSIVE part does most of the work. The second part gets the
182  * targets of the symlinks in pg_tblspc directory.
183  *
184  * XXX: There is no backend function to get a symbolic link's target in
185  * general, so if the admin has put any custom symbolic links in the data
186  * directory, they won't be copied correctly.
187  */
188  sql =
189  "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
190  " SELECT '' AS path, filename, size, isdir FROM\n"
191  " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
192  " pg_stat_file(fn.filename, true) AS this\n"
193  " UNION ALL\n"
194  " SELECT parent.path || parent.filename || '/' AS path,\n"
195  " fn, this.size, this.isdir\n"
196  " FROM files AS parent,\n"
197  " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
198  " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
199  " WHERE parent.isdir = 't'\n"
200  ")\n"
201  "SELECT path || filename, size, isdir,\n"
202  " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
203  "FROM files\n"
204  "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
205  " AND oid::text = files.filename\n";
206  res = PQexec(conn, sql);
207 
208  if (PQresultStatus(res) != PGRES_TUPLES_OK)
209  pg_fatal("could not fetch file list: %s",
210  PQresultErrorMessage(res));
211 
212  /* sanity check the result set */
213  if (PQnfields(res) != 4)
214  pg_fatal("unexpected result set while fetching file list");
215 
216  /* Read result to local variables */
217  for (i = 0; i < PQntuples(res); i++)
218  {
219  char *path = PQgetvalue(res, i, 0);
220  int64 filesize = atol(PQgetvalue(res, i, 1));
221  bool isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
222  char *link_target = PQgetvalue(res, i, 3);
224 
225  if (PQgetisnull(res, 0, 1))
226  {
227  /*
228  * The file was removed from the server while the query was
229  * running. Ignore it.
230  */
231  continue;
232  }
233 
234  if (link_target[0])
235  type = FILE_TYPE_SYMLINK;
236  else if (isdir)
237  type = FILE_TYPE_DIRECTORY;
238  else
239  type = FILE_TYPE_REGULAR;
240 
241  process_source_file(path, type, filesize, link_target);
242  }
243  PQclear(res);
244 }
245 
246 /*----
247  * Runs a query, which returns pieces of files from the remote source data
248  * directory, and overwrites the corresponding parts of target files with
249  * the received parts. The result set is expected to be of format:
250  *
251  * path text -- path in the data directory, e.g "base/1/123"
252  * begin int8 -- offset within the file
253  * chunk bytea -- file content
254  *----
255  */
256 static void
257 receiveFileChunks(const char *sql)
258 {
259  PGresult *res;
260 
261  if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
262  pg_fatal("could not send query: %s", PQerrorMessage(conn));
263 
264  pg_log_debug("getting file chunks");
265 
266  if (PQsetSingleRowMode(conn) != 1)
267  pg_fatal("could not set libpq connection to single row mode");
268 
269  while ((res = PQgetResult(conn)) != NULL)
270  {
271  char *filename;
272  int filenamelen;
273  int64 chunkoff;
274  int chunksize;
275  char *chunk;
276 
277  switch (PQresultStatus(res))
278  {
279  case PGRES_SINGLE_TUPLE:
280  break;
281 
282  case PGRES_TUPLES_OK:
283  PQclear(res);
284  continue; /* final zero-row result */
285 
286  default:
287  pg_fatal("unexpected result while fetching remote files: %s",
288  PQresultErrorMessage(res));
289  }
290 
291  /* sanity check the result set */
292  if (PQnfields(res) != 3 || PQntuples(res) != 1)
293  pg_fatal("unexpected result set size while fetching remote files");
294 
295  if (PQftype(res, 0) != TEXTOID ||
296  PQftype(res, 1) != INT8OID ||
297  PQftype(res, 2) != BYTEAOID)
298  {
299  pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
300  PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
301  }
302 
303  if (PQfformat(res, 0) != 1 &&
304  PQfformat(res, 1) != 1 &&
305  PQfformat(res, 2) != 1)
306  {
307  pg_fatal("unexpected result format while fetching remote files");
308  }
309 
310  if (PQgetisnull(res, 0, 0) ||
311  PQgetisnull(res, 0, 1))
312  {
313  pg_fatal("unexpected null values in result while fetching remote files");
314  }
315 
316  if (PQgetlength(res, 0, 1) != sizeof(int64))
317  pg_fatal("unexpected result length while fetching remote files");
318 
319  /* Read result set to local variables */
320  memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
321  chunkoff = pg_ntoh64(chunkoff);
322  chunksize = PQgetlength(res, 0, 2);
323 
324  filenamelen = PQgetlength(res, 0, 0);
325  filename = pg_malloc(filenamelen + 1);
326  memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
327  filename[filenamelen] = '\0';
328 
329  chunk = PQgetvalue(res, 0, 2);
330 
331  /*
332  * If a file has been deleted on the source, remove it on the target
333  * as well. Note that multiple unlink() calls may happen on the same
334  * file if multiple data chunks are associated with it, hence ignore
335  * unconditionally anything missing. If this file is not a relation
336  * data file, then it has been already truncated when creating the
337  * file chunk list at the previous execution of the filemap.
338  */
339  if (PQgetisnull(res, 0, 2))
340  {
341  pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
342  filename);
343  remove_target_file(filename, true);
344  pg_free(filename);
345  PQclear(res);
346  continue;
347  }
348 
349  pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
350  filename, (long long int) chunkoff, chunksize);
351 
352  open_target_file(filename, false);
353 
354  write_target_range(chunk, chunkoff, chunksize);
355 
356  pg_free(filename);
357 
358  PQclear(res);
359  }
360 }
361 
362 /*
363  * Receive a single file as a malloc'd buffer.
364  */
365 char *
366 libpqGetFile(const char *filename, size_t *filesize)
367 {
368  PGresult *res;
369  char *result;
370  int len;
371  const char *paramValues[1];
372 
373  paramValues[0] = filename;
374  res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
375  1, NULL, paramValues, NULL, NULL, 1);
376 
377  if (PQresultStatus(res) != PGRES_TUPLES_OK)
378  pg_fatal("could not fetch remote file \"%s\": %s",
379  filename, PQresultErrorMessage(res));
380 
381  /* sanity check the result set */
382  if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
383  pg_fatal("unexpected result set while fetching remote file \"%s\"",
384  filename);
385 
386  /* Read result to local variables */
387  len = PQgetlength(res, 0, 0);
388  result = pg_malloc(len + 1);
389  memcpy(result, PQgetvalue(res, 0, 0), len);
390  result[len] = '\0';
391 
392  PQclear(res);
393 
394  pg_log_debug("fetched file \"%s\", length %d", filename, len);
395 
396  if (filesize)
397  *filesize = len;
398  return result;
399 }
400 
401 /*
402  * Write a file range to a temporary table in the server.
403  *
404  * The range is sent to the server as a COPY formatted line, to be inserted
405  * into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
406  * function to actually fetch the data.
407  */
408 static void
409 fetch_file_range(const char *path, uint64 begin, uint64 end)
410 {
411  char linebuf[MAXPGPATH + 23];
412 
413  /* Split the range into CHUNKSIZE chunks */
414  while (end - begin > 0)
415  {
416  unsigned int len;
417 
418  /* Fine as long as CHUNKSIZE is not bigger than UINT32_MAX */
419  if (end - begin > CHUNKSIZE)
420  len = CHUNKSIZE;
421  else
422  len = (unsigned int) (end - begin);
423 
424  snprintf(linebuf, sizeof(linebuf), "%s\t" UINT64_FORMAT "\t%u\n", path, begin, len);
425 
426  if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
427  pg_fatal("could not send COPY data: %s",
428  PQerrorMessage(conn));
429 
430  begin += len;
431  }
432 }
433 
434 /*
435  * Fetch all changed blocks from remote source data directory.
436  */
437 void
439 {
440  file_entry_t *entry;
441  const char *sql;
442  PGresult *res;
443  int i;
444 
445  /*
446  * First create a temporary table, and load it with the blocks that we
447  * need to fetch.
448  */
449  sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int8, len int4);";
450  run_simple_command(sql);
451 
452  sql = "COPY fetchchunks FROM STDIN";
453  res = PQexec(conn, sql);
454 
455  if (PQresultStatus(res) != PGRES_COPY_IN)
456  pg_fatal("could not send file list: %s",
457  PQresultErrorMessage(res));
458  PQclear(res);
459 
460  for (i = 0; i < map->narray; i++)
461  {
462  entry = map->array[i];
463 
464  /* If this is a relation file, copy the modified blocks */
465  execute_pagemap(&entry->pagemap, entry->path);
466 
467  switch (entry->action)
468  {
469  case FILE_ACTION_NONE:
470  /* nothing else to do */
471  break;
472 
473  case FILE_ACTION_COPY:
474  /* Truncate the old file out of the way, if any */
475  open_target_file(entry->path, true);
476  fetch_file_range(entry->path, 0, entry->newsize);
477  break;
478 
480  truncate_target_file(entry->path, entry->newsize);
481  break;
482 
484  fetch_file_range(entry->path, entry->oldsize, entry->newsize);
485  break;
486 
487  case FILE_ACTION_REMOVE:
488  remove_target(entry);
489  break;
490 
491  case FILE_ACTION_CREATE:
492  create_target(entry);
493  break;
494  }
495  }
496 
497  if (PQputCopyEnd(conn, NULL) != 1)
498  pg_fatal("could not send end-of-COPY: %s",
499  PQerrorMessage(conn));
500 
501  while ((res = PQgetResult(conn)) != NULL)
502  {
503  if (PQresultStatus(res) != PGRES_COMMAND_OK)
504  pg_fatal("unexpected result while sending file list: %s",
505  PQresultErrorMessage(res));
506  PQclear(res);
507  }
508 
509  /*
510  * We've now copied the list of file ranges that we need to fetch to the
511  * temporary table. Now, actually fetch all of those ranges.
512  */
513  sql =
514  "SELECT path, begin,\n"
515  " pg_read_binary_file(path, begin, len, true) AS chunk\n"
516  "FROM fetchchunks\n";
517 
518  receiveFileChunks(sql);
519 }
520 
521 static void
522 execute_pagemap(datapagemap_t *pagemap, const char *path)
523 {
525  BlockNumber blkno;
526  off_t offset;
527 
528  iter = datapagemap_iterate(pagemap);
529  while (datapagemap_next(iter, &blkno))
530  {
531  offset = blkno * BLCKSZ;
532 
533  fetch_file_range(path, offset, offset + BLCKSZ);
534  }
535  pg_free(iter);
536 }
int PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
Definition: fe-exec.c:2318
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3175
int PQnfields(const PGresult *res)
Definition: fe-exec.c:2778
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6561
void libpqProcessFileList(void)
Definition: libpq_fetch.c:172
void open_target_file(const char *path, bool trunc)
Definition: file_ops.c:42
void write_target_range(char *buf, off_t begin, size_t size)
Definition: file_ops.c:83
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1286
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3164
file_entry_t ** array
Definition: filemap.h:79
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
void remove_target_file(const char *path, bool missing_ok)
Definition: file_ops.c:172
size_t newsize
Definition: filemap.h:51
#define pg_fatal(...)
Definition: pg_rewind.h:39
static void execute_pagemap(datapagemap_t *pagemap, const char *path)
Definition: libpq_fetch.c:522
int PQputCopyEnd(PGconn *conn, const char *errormsg)
Definition: fe-exec.c:2385
uint32 BlockNumber
Definition: block.h:31
#define CHUNKSIZE
Definition: libpq_fetch.c:37
int PQntuples(const PGresult *res)
Definition: fe-exec.c:2770
datapagemap_t pagemap
Definition: filemap.h:54
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2693
int narray
Definition: filemap.h:80
void truncate_target_file(const char *path, off_t newsize)
Definition: file_ops.c:191
static char * run_simple_query(const char *sql)
Definition: libpq_fetch.c:105
bool datapagemap_next(datapagemap_iterator_t *iter, BlockNumber *blkno)
Definition: datapagemap.c:88
#define pg_log_debug(...)
Definition: logging.h:91
#define MAXPGPATH
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3008
file_action_t action
Definition: filemap.h:47
static void receiveFileChunks(const char *sql)
Definition: libpq_fetch.c:257
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1678
size_t oldsize
Definition: filemap.h:50
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
unsigned int uint32
Definition: c.h:358
static PGconn * conn
Definition: libpq_fetch.c:28
void libpq_executeFileMap(filemap_t *map)
Definition: libpq_fetch.c:438
static bool showprogress
Definition: pg_basebackup.c:93
static void run_simple_command(const char *sql)
Definition: libpq_fetch.c:132
void libpqConnect(const char *connstr)
Definition: libpq_fetch.c:45
void PQclear(PGresult *res)
Definition: fe-exec.c:695
void remove_target(file_entry_t *entry)
Definition: file_ops.c:125
uint64 XLogRecPtr
Definition: xlogdefs.h:21
datapagemap_iterator_t * datapagemap_iterate(datapagemap_t *map)
Definition: datapagemap.c:76
char * path
Definition: filemap.h:44
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
void process_source_file(const char *path, file_type_t type, size_t newsize, const char *link_target)
Definition: filemap.c:136
static char * filename
Definition: pg_dumpall.c:91
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:2709
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1954
static void fetch_file_range(const char *path, uint64 begin, uint64 end)
Definition: libpq_fetch.c:409
int i
Definition: filemap.h:42
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1940
void create_target(file_entry_t *entry)
Definition: file_ops.c:146
XLogRecPtr libpqGetCurrentXlogInsertLocation(void)
Definition: libpq_fetch.c:149
file_type_t
Definition: filemap.h:35
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3189
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:6508
int PQfformat(const PGresult *res, int field_num)
Definition: fe-exec.c:2997
#define snprintf
Definition: port.h:192
char * libpqGetFile(const char *filename, size_t *filesize)
Definition: libpq_fetch.c:366
#define UINT64_FORMAT
Definition: c.h:401
#define pg_ntoh64(x)
Definition: pg_bswap.h:126
long val
Definition: informix.c:684
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1779
#define pg_log_info(...)
Definition: logging.h:87
PGconn * PQconnectdb(const char *conninfo)
Definition: fe-connect.c:669
static char * connstr
Definition: pg_dumpall.c:62