PostgreSQL Source Code  git master
libpq_source.c File Reference
#include "postgres_fe.h"
#include "catalog/pg_type_d.h"
#include "common/connect.h"
#include "datapagemap.h"
#include "file_ops.h"
#include "filemap.h"
#include "lib/stringinfo.h"
#include "pg_rewind.h"
#include "port/pg_bswap.h"
#include "rewind_source.h"
Include dependency graph for libpq_source.c:

Go to the source code of this file.

Data Structures

struct  fetch_range_request
 
struct  libpq_source
 

Macros

#define MAX_CHUNK_SIZE   (1024 * 1024)
 
#define MAX_CHUNKS_PER_QUERY   1000
 

Functions

static void init_libpq_conn (PGconn *conn)
 
static char * run_simple_query (PGconn *conn, const char *sql)
 
static void run_simple_command (PGconn *conn, const char *sql)
 
static void appendArrayEscapedString (StringInfo buf, const char *str)
 
static void process_queued_fetch_requests (libpq_source *src)
 
static void libpq_traverse_files (rewind_source *source, process_file_callback_t callback)
 
static void libpq_queue_fetch_file (rewind_source *source, const char *path, size_t len)
 
static void libpq_queue_fetch_range (rewind_source *source, const char *path, off_t off, size_t len)
 
static void libpq_finish_fetch (rewind_source *source)
 
static char * libpq_fetch_file (rewind_source *source, const char *path, size_t *filesize)
 
static XLogRecPtr libpq_get_current_wal_insert_lsn (rewind_source *source)
 
static void libpq_destroy (rewind_source *source)
 
rewind_sourceinit_libpq_source (PGconn *conn)
 

Macro Definition Documentation

◆ MAX_CHUNK_SIZE

#define MAX_CHUNK_SIZE   (1024 * 1024)

Definition at line 26 of file libpq_source.c.

◆ MAX_CHUNKS_PER_QUERY

#define MAX_CHUNKS_PER_QUERY   1000

Definition at line 27 of file libpq_source.c.

Function Documentation

◆ appendArrayEscapedString()

static void appendArrayEscapedString ( StringInfo  buf,
const char *  str 
)
static

Definition at line 614 of file libpq_source.c.

615 {
617  while (*str)
618  {
619  char ch = *str;
620 
621  if (ch == '"' || ch == '\\')
623 
625 
626  str++;
627  }
629 }
const char * str
static char * buf
Definition: pg_test_fsync.c:73
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:204

References appendStringInfoCharMacro, buf, and str.

Referenced by process_queued_fetch_requests().

◆ init_libpq_conn()

static void init_libpq_conn ( PGconn conn)
static

Definition at line 111 of file libpq_source.c.

112 {
113  PGresult *res;
114  char *str;
115 
116  /* disable all types of timeouts */
117  run_simple_command(conn, "SET statement_timeout = 0");
118  run_simple_command(conn, "SET lock_timeout = 0");
119  run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
120  run_simple_command(conn, "SET transaction_timeout = 0");
121 
122  /*
123  * we don't intend to do any updates, put the connection in read-only mode
124  * to keep us honest
125  */
126  run_simple_command(conn, "SET default_transaction_read_only = on");
127 
128  /* secure search_path */
131  pg_fatal("could not clear search_path: %s",
133  PQclear(res);
134 
135  /*
136  * Also check that full_page_writes is enabled. We can get torn pages if
137  * a page is modified while we read it with pg_read_binary_file(), and we
138  * rely on full page images to fix them.
139  */
140  str = run_simple_query(conn, "SHOW full_page_writes");
141  if (strcmp(str, "on") != 0)
142  pg_fatal("full_page_writes must be enabled in the source server");
143  pg_free(str);
144 
145  /* Prepare a statement we'll use to fetch files */
146  res = PQprepare(conn, "fetch_chunks_stmt",
147  "SELECT path, begin,\n"
148  " pg_read_binary_file(path, begin, len, true) AS chunk\n"
149  "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
150  3, NULL);
151 
153  pg_fatal("could not prepare statement to fetch file contents: %s",
155  PQclear(res);
156 }
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
void pg_free(void *ptr)
Definition: fe_memutils.c:105
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:103
static void run_simple_command(PGconn *conn, const char *sql)
Definition: libpq_source.c:192
static char * run_simple_query(PGconn *conn, const char *sql)
Definition: libpq_source.c:164
#define pg_fatal(...)
PGconn * conn
Definition: streamutil.c:55

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, pg_fatal, pg_free(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQexec(), PQprepare(), PQresultErrorMessage(), PQresultStatus(), res, run_simple_command(), run_simple_query(), and str.

Referenced by init_libpq_source().

◆ init_libpq_source()

rewind_source* init_libpq_source ( PGconn conn)

Definition at line 82 of file libpq_source.c.

83 {
84  libpq_source *src;
85 
87 
88  src = pg_malloc0(sizeof(libpq_source));
89 
97 
98  src->conn = conn;
99 
100  initStringInfo(&src->paths);
101  initStringInfo(&src->offsets);
102  initStringInfo(&src->lengths);
103 
104  return &src->common;
105 }
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static void init_libpq_conn(PGconn *conn)
Definition: libpq_source.c:111
static char * libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
Definition: libpq_source.c:635
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source)
Definition: libpq_source.c:209
static void libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
Definition: libpq_source.c:233
static void libpq_destroy(rewind_source *source)
Definition: libpq_source.c:675
static void libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len)
Definition: libpq_source.c:356
static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
Definition: libpq_source.c:326
static void libpq_finish_fetch(rewind_source *source)
Definition: libpq_source.c:421
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
StringInfoData paths
Definition: libpq_source.c:51
StringInfoData lengths
Definition: libpq_source.c:53
PGconn * conn
Definition: libpq_source.c:41
StringInfoData offsets
Definition: libpq_source.c:52
rewind_source common
Definition: libpq_source.c:39
void(* queue_fetch_file)(struct rewind_source *, const char *path, size_t len)
Definition: rewind_source.h:60
void(* traverse_files)(struct rewind_source *, process_file_callback_t callback)
Definition: rewind_source.h:29
void(* finish_fetch)(struct rewind_source *)
Definition: rewind_source.h:66
XLogRecPtr(* get_current_wal_insert_lsn)(struct rewind_source *)
Definition: rewind_source.h:71
void(* queue_fetch_range)(struct rewind_source *, const char *path, off_t offset, size_t len)
Definition: rewind_source.h:47
char *(* fetch_file)(struct rewind_source *, const char *path, size_t *filesize)
Definition: rewind_source.h:37
void(* destroy)(struct rewind_source *)
Definition: rewind_source.h:76

References libpq_source::common, conn, libpq_source::conn, rewind_source::destroy, rewind_source::fetch_file, rewind_source::finish_fetch, rewind_source::get_current_wal_insert_lsn, init_libpq_conn(), initStringInfo(), libpq_source::lengths, libpq_destroy(), libpq_fetch_file(), libpq_finish_fetch(), libpq_get_current_wal_insert_lsn(), libpq_queue_fetch_file(), libpq_queue_fetch_range(), libpq_traverse_files(), libpq_source::offsets, libpq_source::paths, pg_malloc0(), rewind_source::queue_fetch_file, rewind_source::queue_fetch_range, and rewind_source::traverse_files.

Referenced by main().

◆ libpq_destroy()

static void libpq_destroy ( rewind_source source)
static

Definition at line 675 of file libpq_source.c.

676 {
677  libpq_source *src = (libpq_source *) source;
678 
679  pfree(src->paths.data);
680  pfree(src->offsets.data);
681  pfree(src->lengths.data);
682  pfree(src);
683 
684  /* NOTE: we don't close the connection here, as it was not opened by us. */
685 }
void pfree(void *pointer)
Definition: mcxt.c:1520
static rewind_source * source
Definition: pg_rewind.c:89

References StringInfoData::data, libpq_source::lengths, libpq_source::offsets, libpq_source::paths, pfree(), and source.

Referenced by init_libpq_source().

◆ libpq_fetch_file()

static char * libpq_fetch_file ( rewind_source source,
const char *  path,
size_t *  filesize 
)
static

Definition at line 635 of file libpq_source.c.

636 {
637  PGconn *conn = ((libpq_source *) source)->conn;
638  PGresult *res;
639  char *result;
640  int len;
641  const char *paramValues[1];
642 
643  paramValues[0] = path;
644  res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
645  1, NULL, paramValues, NULL, NULL, 1);
646 
648  pg_fatal("could not fetch remote file \"%s\": %s",
649  path, PQresultErrorMessage(res));
650 
651  /* sanity check the result set */
652  if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
653  pg_fatal("unexpected result set while fetching remote file \"%s\"",
654  path);
655 
656  /* Read result to local variables */
657  len = PQgetlength(res, 0, 0);
658  result = pg_malloc(len + 1);
659  memcpy(result, PQgetvalue(res, 0, 0), len);
660  result[len] = '\0';
661 
662  PQclear(res);
663 
664  pg_log_debug("fetched file \"%s\", length %d", path, len);
665 
666  if (filesize)
667  *filesize = len;
668  return result;
669 }
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3887
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3901
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define pg_log_debug(...)
Definition: logging.h:133
const void size_t len

References conn, len, pg_fatal, pg_log_debug, pg_malloc(), PGRES_TUPLES_OK, PQclear(), PQexecParams(), PQgetisnull(), PQgetlength(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), res, and source.

Referenced by init_libpq_source().

◆ libpq_finish_fetch()

static void libpq_finish_fetch ( rewind_source source)
static

Definition at line 421 of file libpq_source.c.

422 {
424 }
static void process_queued_fetch_requests(libpq_source *src)
Definition: libpq_source.c:427

References process_queued_fetch_requests(), and source.

Referenced by init_libpq_source().

◆ libpq_get_current_wal_insert_lsn()

static XLogRecPtr libpq_get_current_wal_insert_lsn ( rewind_source source)
static

Definition at line 209 of file libpq_source.c.

210 {
211  PGconn *conn = ((libpq_source *) source)->conn;
212  XLogRecPtr result;
213  uint32 hi;
214  uint32 lo;
215  char *val;
216 
217  val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
218 
219  if (sscanf(val, "%X/%X", &hi, &lo) != 2)
220  pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
221 
222  result = ((uint64) hi) << 32 | lo;
223 
224  pg_free(val);
225 
226  return result;
227 }
unsigned int uint32
Definition: c.h:506
long val
Definition: informix.c:670
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References conn, pg_fatal, pg_free(), run_simple_query(), source, and val.

Referenced by init_libpq_source().

◆ libpq_queue_fetch_file()

static void libpq_queue_fetch_file ( rewind_source source,
const char *  path,
size_t  len 
)
static

Definition at line 326 of file libpq_source.c.

327 {
328  /*
329  * Truncate the target file immediately, and queue a request to fetch it
330  * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
331  * request fetching a full-sized chunk anyway, so that if the file has
332  * become larger in the source system, after we scanned the source
333  * directory, we still fetch the whole file. This only works for files up
334  * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
335  * and such that are changed every now and then, but not WAL-logged. For
336  * larger files, we fetch up to the original size.
337  *
338  * Even with that mechanism, there is an inherent race condition if the
339  * file is modified at the same instant that we're copying it, so that we
340  * might copy a torn version of the file with one half from the old
341  * version and another half from the new. But pg_basebackup has the same
342  * problem, and it hasn't been a problem in practice.
343  *
344  * It might seem more natural to truncate the file later, when we receive
345  * it from the source server, but then we'd need to track which
346  * fetch-requests are for a whole file.
347  */
348  open_target_file(path, true);
350 }
#define Max(x, y)
Definition: c.h:998
void open_target_file(const char *path, bool trunc)
Definition: file_ops.c:47
#define MAX_CHUNK_SIZE
Definition: libpq_source.c:26

References len, libpq_queue_fetch_range(), Max, MAX_CHUNK_SIZE, open_target_file(), and source.

Referenced by init_libpq_source().

◆ libpq_queue_fetch_range()

static void libpq_queue_fetch_range ( rewind_source source,
const char *  path,
off_t  off,
size_t  len 
)
static

Definition at line 356 of file libpq_source.c.

358 {
359  libpq_source *src = (libpq_source *) source;
360 
361  /*
362  * Does this request happen to be a continuation of the previous chunk? If
363  * so, merge it with the previous one.
364  *
365  * XXX: We use pointer equality to compare the path. That's good enough
366  * for our purposes; the caller always passes the same pointer for the
367  * same filename. If it didn't, we would fail to merge requests, but it
368  * wouldn't affect correctness.
369  */
370  if (src->num_requests > 0)
371  {
372  fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
373 
374  if (prev->offset + prev->length == off &&
375  prev->length < MAX_CHUNK_SIZE &&
376  prev->path == path)
377  {
378  /*
379  * Extend the previous request to cover as much of this new
380  * request as possible, without exceeding MAX_CHUNK_SIZE.
381  */
382  size_t thislen;
383 
384  thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
385  prev->length += thislen;
386 
387  off += thislen;
388  len -= thislen;
389 
390  /*
391  * Fall through to create new requests for any remaining 'len'
392  * that didn't fit in the previous chunk.
393  */
394  }
395  }
396 
397  /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
398  while (len > 0)
399  {
400  int32 thislen;
401 
402  /* if the queue is full, perform all the work queued up so far */
405 
406  thislen = Min(len, MAX_CHUNK_SIZE);
407  src->request_queue[src->num_requests].path = path;
408  src->request_queue[src->num_requests].offset = off;
409  src->request_queue[src->num_requests].length = thislen;
410  src->num_requests++;
411 
412  off += thislen;
413  len -= thislen;
414  }
415 }
#define Min(x, y)
Definition: c.h:1004
signed int int32
Definition: c.h:494
#define MAX_CHUNKS_PER_QUERY
Definition: libpq_source.c:27
const char * path
Definition: libpq_source.c:32
fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY]
Definition: libpq_source.c:48

References len, fetch_range_request::length, MAX_CHUNK_SIZE, MAX_CHUNKS_PER_QUERY, Min, libpq_source::num_requests, fetch_range_request::offset, fetch_range_request::path, process_queued_fetch_requests(), libpq_source::request_queue, and source.

Referenced by init_libpq_source(), and libpq_queue_fetch_file().

◆ libpq_traverse_files()

static void libpq_traverse_files ( rewind_source source,
process_file_callback_t  callback 
)
static

Definition at line 233 of file libpq_source.c.

234 {
235  PGconn *conn = ((libpq_source *) source)->conn;
236  PGresult *res;
237  const char *sql;
238  int i;
239 
240  /*
241  * Create a recursive directory listing of the whole data directory.
242  *
243  * The WITH RECURSIVE part does most of the work. The second part gets the
244  * targets of the symlinks in pg_tblspc directory.
245  *
246  * XXX: There is no backend function to get a symbolic link's target in
247  * general, so if the admin has put any custom symbolic links in the data
248  * directory, they won't be copied correctly.
249  */
250  sql =
251  "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
252  " SELECT '' AS path, filename, size, isdir FROM\n"
253  " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
254  " pg_stat_file(fn.filename, true) AS this\n"
255  " UNION ALL\n"
256  " SELECT parent.path || parent.filename || '/' AS path,\n"
257  " fn, this.size, this.isdir\n"
258  " FROM files AS parent,\n"
259  " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
260  " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
261  " WHERE parent.isdir = 't'\n"
262  ")\n"
263  "SELECT path || filename, size, isdir,\n"
264  " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
265  "FROM files\n"
266  "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
267  " AND oid::text = files.filename\n";
268  res = PQexec(conn, sql);
269 
271  pg_fatal("could not fetch file list: %s",
273 
274  /* sanity check the result set */
275  if (PQnfields(res) != 4)
276  pg_fatal("unexpected result set while fetching file list");
277 
278  /* Read result to local variables */
279  for (i = 0; i < PQntuples(res); i++)
280  {
281  char *path;
282  int64 filesize;
283  bool isdir;
284  char *link_target;
286 
287  if (PQgetisnull(res, i, 1))
288  {
289  /*
290  * The file was removed from the server while the query was
291  * running. Ignore it.
292  */
293  continue;
294  }
295 
296  path = PQgetvalue(res, i, 0);
297  filesize = atol(PQgetvalue(res, i, 1));
298  isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
299  link_target = PQgetvalue(res, i, 3);
300 
301  if (link_target[0])
302  {
303  /*
304  * In-place tablespaces are directories located in pg_tblspc/ with
305  * relative paths.
306  */
307  if (is_absolute_path(link_target))
309  else
311  }
312  else if (isdir)
314  else
316 
317  callback(path, type, filesize, link_target);
318  }
319  PQclear(res);
320 }
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
file_type_t
Definition: filemap.h:31
@ FILE_TYPE_REGULAR
Definition: filemap.h:34
@ FILE_TYPE_SYMLINK
Definition: filemap.h:36
@ FILE_TYPE_DIRECTORY
Definition: filemap.h:35
int i
Definition: isn.c:73
#define is_absolute_path(filename)
Definition: port.h:103
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46
const char * type

References callback(), conn, FILE_TYPE_DIRECTORY, FILE_TYPE_REGULAR, FILE_TYPE_SYMLINK, i, is_absolute_path, pg_fatal, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), res, source, and type.

Referenced by init_libpq_source().

◆ process_queued_fetch_requests()

static void process_queued_fetch_requests ( libpq_source src)
static

Definition at line 427 of file libpq_source.c.

428 {
429  const char *params[3];
430  PGresult *res;
431  int chunkno;
432 
433  if (src->num_requests == 0)
434  return;
435 
436  pg_log_debug("getting %d file chunks", src->num_requests);
437 
438  /*
439  * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
440  * the same length as parameters: paths, offsets and lengths. Construct
441  * the string representations of them.
442  */
443  resetStringInfo(&src->paths);
444  resetStringInfo(&src->offsets);
445  resetStringInfo(&src->lengths);
446 
447  appendStringInfoChar(&src->paths, '{');
448  appendStringInfoChar(&src->offsets, '{');
449  appendStringInfoChar(&src->lengths, '{');
450  for (int i = 0; i < src->num_requests; i++)
451  {
452  fetch_range_request *rq = &src->request_queue[i];
453 
454  if (i > 0)
455  {
456  appendStringInfoChar(&src->paths, ',');
457  appendStringInfoChar(&src->offsets, ',');
458  appendStringInfoChar(&src->lengths, ',');
459  }
460 
461  appendArrayEscapedString(&src->paths, rq->path);
462  appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset);
463  appendStringInfo(&src->lengths, INT64_FORMAT, (int64) rq->length);
464  }
465  appendStringInfoChar(&src->paths, '}');
466  appendStringInfoChar(&src->offsets, '}');
467  appendStringInfoChar(&src->lengths, '}');
468 
469  /*
470  * Execute the prepared statement.
471  */
472  params[0] = src->paths.data;
473  params[1] = src->offsets.data;
474  params[2] = src->lengths.data;
475 
476  if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
477  pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
478 
479  if (PQsetSingleRowMode(src->conn) != 1)
480  pg_fatal("could not set libpq connection to single row mode");
481 
482  /*----
483  * The result set is of format:
484  *
485  * path text -- path in the data directory, e.g "base/1/123"
486  * begin int8 -- offset within the file
487  * chunk bytea -- file content
488  *----
489  */
490  chunkno = 0;
491  while ((res = PQgetResult(src->conn)) != NULL)
492  {
493  fetch_range_request *rq = &src->request_queue[chunkno];
494  char *filename;
495  int filenamelen;
496  int64 chunkoff;
497  int chunksize;
498  char *chunk;
499 
500  switch (PQresultStatus(res))
501  {
502  case PGRES_SINGLE_TUPLE:
503  break;
504 
505  case PGRES_TUPLES_OK:
506  PQclear(res);
507  continue; /* final zero-row result */
508 
509  default:
510  pg_fatal("unexpected result while fetching remote files: %s",
512  }
513 
514  if (chunkno > src->num_requests)
515  pg_fatal("received more data chunks than requested");
516 
517  /* sanity check the result set */
518  if (PQnfields(res) != 3 || PQntuples(res) != 1)
519  pg_fatal("unexpected result set size while fetching remote files");
520 
521  if (PQftype(res, 0) != TEXTOID ||
522  PQftype(res, 1) != INT8OID ||
523  PQftype(res, 2) != BYTEAOID)
524  {
525  pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
526  PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
527  }
528 
529  if (PQfformat(res, 0) != 1 &&
530  PQfformat(res, 1) != 1 &&
531  PQfformat(res, 2) != 1)
532  {
533  pg_fatal("unexpected result format while fetching remote files");
534  }
535 
536  if (PQgetisnull(res, 0, 0) ||
537  PQgetisnull(res, 0, 1))
538  {
539  pg_fatal("unexpected null values in result while fetching remote files");
540  }
541 
542  if (PQgetlength(res, 0, 1) != sizeof(int64))
543  pg_fatal("unexpected result length while fetching remote files");
544 
545  /* Read result set to local variables */
546  memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
547  chunkoff = pg_ntoh64(chunkoff);
548  chunksize = PQgetlength(res, 0, 2);
549 
550  filenamelen = PQgetlength(res, 0, 0);
551  filename = pg_malloc(filenamelen + 1);
552  memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
553  filename[filenamelen] = '\0';
554 
555  chunk = PQgetvalue(res, 0, 2);
556 
557  /*
558  * If a file has been deleted on the source, remove it on the target
559  * as well. Note that multiple unlink() calls may happen on the same
560  * file if multiple data chunks are associated with it, hence ignore
561  * unconditionally anything missing.
562  */
563  if (PQgetisnull(res, 0, 2))
564  {
565  pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
566  filename);
568  }
569  else
570  {
571  pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
572  filename, (long long int) chunkoff, chunksize);
573 
574  if (strcmp(filename, rq->path) != 0)
575  {
576  pg_fatal("received data for file \"%s\", when requested for \"%s\"",
577  filename, rq->path);
578  }
579  if (chunkoff != rq->offset)
580  pg_fatal("received data at offset %lld of file \"%s\", when requested for offset %lld",
581  (long long int) chunkoff, rq->path, (long long int) rq->offset);
582 
583  /*
584  * We should not receive more data than we requested, or
585  * pg_read_binary_file() messed up. We could receive less,
586  * though, if the file was truncated in the source after we
587  * checked its size. That's OK, there should be a WAL record of
588  * the truncation, which will get replayed when you start the
589  * target system for the first time after pg_rewind has completed.
590  */
591  if (chunksize > rq->length)
592  pg_fatal("received more than requested for file \"%s\"", rq->path);
593 
594  open_target_file(filename, false);
595 
596  write_target_range(chunk, chunkoff, chunksize);
597  }
598 
599  pg_free(filename);
600 
601  PQclear(res);
602  chunkno++;
603  }
604  if (chunkno != src->num_requests)
605  pg_fatal("unexpected number of data chunks received");
606 
607  src->num_requests = 0;
608 }
#define INT64_FORMAT
Definition: c.h:548
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7147
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
int PQfformat(const PGresult *res, int field_num)
Definition: fe-exec.c:3708
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
void remove_target_file(const char *path, bool missing_ok)
Definition: file_ops.c:187
void write_target_range(char *buf, off_t begin, size_t size)
Definition: file_ops.c:88
uint64 chunk
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:113
static void appendArrayEscapedString(StringInfo buf, const char *str)
Definition: libpq_source.c:614
#define pg_ntoh64(x)
Definition: pg_bswap.h:126
static char * filename
Definition: pg_dumpall.c:119
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:194

References appendArrayEscapedString(), appendStringInfo(), appendStringInfoChar(), chunk, libpq_source::conn, StringInfoData::data, filename, i, INT64_FORMAT, fetch_range_request::length, libpq_source::lengths, libpq_source::num_requests, fetch_range_request::offset, libpq_source::offsets, open_target_file(), fetch_range_request::path, libpq_source::paths, pg_fatal, pg_free(), pg_log_debug, pg_malloc(), pg_ntoh64, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQfformat(), PQftype(), PQgetisnull(), PQgetlength(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), PQsendQueryPrepared(), PQsetSingleRowMode(), remove_target_file(), libpq_source::request_queue, res, resetStringInfo(), and write_target_range().

Referenced by libpq_finish_fetch(), and libpq_queue_fetch_range().

◆ run_simple_command()

static void run_simple_command ( PGconn conn,
const char *  sql 
)
static

Definition at line 192 of file libpq_source.c.

193 {
194  PGresult *res;
195 
196  res = PQexec(conn, sql);
197 
199  pg_fatal("error running query (%s) in source server: %s",
200  sql, PQresultErrorMessage(res));
201 
202  PQclear(res);
203 }

References conn, pg_fatal, PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultErrorMessage(), PQresultStatus(), and res.

Referenced by init_libpq_conn().

◆ run_simple_query()

static char * run_simple_query ( PGconn conn,
const char *  sql 
)
static

Definition at line 164 of file libpq_source.c.

165 {
166  PGresult *res;
167  char *result;
168 
169  res = PQexec(conn, sql);
170 
172  pg_fatal("error running query (%s) on source server: %s",
173  sql, PQresultErrorMessage(res));
174 
175  /* sanity check the result set */
176  if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
177  pg_fatal("unexpected result set from query");
178 
179  result = pg_strdup(PQgetvalue(res, 0, 0));
180 
181  PQclear(res);
182 
183  return result;
184 }
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85

References conn, pg_fatal, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), and res.

Referenced by init_libpq_conn(), and libpq_get_current_wal_insert_lsn().