PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
libpq_source.c File Reference
#include "postgres_fe.h"
#include "catalog/pg_type_d.h"
#include "common/connect.h"
#include "file_ops.h"
#include "filemap.h"
#include "lib/stringinfo.h"
#include "pg_rewind.h"
#include "port/pg_bswap.h"
#include "rewind_source.h"
Include dependency graph for libpq_source.c:

Go to the source code of this file.

Data Structures

struct  fetch_range_request
 
struct  libpq_source
 

Macros

#define MAX_CHUNK_SIZE   (1024 * 1024)
 
#define MAX_CHUNKS_PER_QUERY   1000
 

Functions

static void init_libpq_conn (PGconn *conn)
 
static char * run_simple_query (PGconn *conn, const char *sql)
 
static void run_simple_command (PGconn *conn, const char *sql)
 
static void appendArrayEscapedString (StringInfo buf, const char *str)
 
static void process_queued_fetch_requests (libpq_source *src)
 
static void libpq_traverse_files (rewind_source *source, process_file_callback_t callback)
 
static void libpq_queue_fetch_file (rewind_source *source, const char *path, size_t len)
 
static void libpq_queue_fetch_range (rewind_source *source, const char *path, off_t off, size_t len)
 
static void libpq_finish_fetch (rewind_source *source)
 
static char * libpq_fetch_file (rewind_source *source, const char *path, size_t *filesize)
 
static XLogRecPtr libpq_get_current_wal_insert_lsn (rewind_source *source)
 
static void libpq_destroy (rewind_source *source)
 
rewind_sourceinit_libpq_source (PGconn *conn)
 

Macro Definition Documentation

◆ MAX_CHUNK_SIZE

#define MAX_CHUNK_SIZE   (1024 * 1024)

Definition at line 25 of file libpq_source.c.

◆ MAX_CHUNKS_PER_QUERY

#define MAX_CHUNKS_PER_QUERY   1000

Definition at line 26 of file libpq_source.c.

Function Documentation

◆ appendArrayEscapedString()

static void appendArrayEscapedString ( StringInfo  buf,
const char *  str 
)
static

Definition at line 613 of file libpq_source.c.

614{
616 while (*str)
617 {
618 char ch = *str;
619
620 if (ch == '"' || ch == '\\')
622
624
625 str++;
626 }
628}
const char * str
static char * buf
Definition: pg_test_fsync.c:72
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:231

References appendStringInfoCharMacro, buf, and str.

Referenced by process_queued_fetch_requests().

◆ init_libpq_conn()

static void init_libpq_conn ( PGconn conn)
static

Definition at line 110 of file libpq_source.c.

111{
112 PGresult *res;
113 char *str;
114
115 /* disable all types of timeouts */
116 run_simple_command(conn, "SET statement_timeout = 0");
117 run_simple_command(conn, "SET lock_timeout = 0");
118 run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
119 run_simple_command(conn, "SET transaction_timeout = 0");
120
121 /*
122 * we don't intend to do any updates, put the connection in read-only mode
123 * to keep us honest
124 */
125 run_simple_command(conn, "SET default_transaction_read_only = on");
126
127 /* secure search_path */
130 pg_fatal("could not clear \"search_path\": %s",
132 PQclear(res);
133
134 /*
135 * Also check that full_page_writes is enabled. We can get torn pages if
136 * a page is modified while we read it with pg_read_binary_file(), and we
137 * rely on full page images to fix them.
138 */
139 str = run_simple_query(conn, "SHOW full_page_writes");
140 if (strcmp(str, "on") != 0)
141 pg_fatal("\"full_page_writes\" must be enabled in the source server");
142 pg_free(str);
143
144 /* Prepare a statement we'll use to fetch files */
145 res = PQprepare(conn, "fetch_chunks_stmt",
146 "SELECT path, begin,\n"
147 " pg_read_binary_file(path, begin, len, true) AS chunk\n"
148 "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
149 3, NULL);
150
152 pg_fatal("could not prepare statement to fetch file contents: %s",
154 PQclear(res);
155}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
void pg_free(void *ptr)
Definition: fe_memutils.c:105
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:124
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:127
static void run_simple_command(PGconn *conn, const char *sql)
Definition: libpq_source.c:191
static char * run_simple_query(PGconn *conn, const char *sql)
Definition: libpq_source.c:163
#define pg_fatal(...)
PGconn * conn
Definition: streamutil.c:52

References ALWAYS_SECURE_SEARCH_PATH_SQL, conn, pg_fatal, pg_free(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQexec(), PQprepare(), PQresultErrorMessage(), PQresultStatus(), run_simple_command(), run_simple_query(), and str.

Referenced by init_libpq_source().

◆ init_libpq_source()

rewind_source * init_libpq_source ( PGconn conn)

Definition at line 81 of file libpq_source.c.

82{
83 libpq_source *src;
84
86
87 src = pg_malloc0(sizeof(libpq_source));
88
96
97 src->conn = conn;
98
99 initStringInfo(&src->paths);
100 initStringInfo(&src->offsets);
101 initStringInfo(&src->lengths);
102
103 return &src->common;
104}
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static void init_libpq_conn(PGconn *conn)
Definition: libpq_source.c:110
static char * libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
Definition: libpq_source.c:634
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source)
Definition: libpq_source.c:208
static void libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
Definition: libpq_source.c:232
static void libpq_destroy(rewind_source *source)
Definition: libpq_source.c:674
static void libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len)
Definition: libpq_source.c:355
static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
Definition: libpq_source.c:325
static void libpq_finish_fetch(rewind_source *source)
Definition: libpq_source.c:420
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
StringInfoData paths
Definition: libpq_source.c:50
StringInfoData lengths
Definition: libpq_source.c:52
PGconn * conn
Definition: libpq_source.c:40
StringInfoData offsets
Definition: libpq_source.c:51
rewind_source common
Definition: libpq_source.c:38
void(* queue_fetch_file)(struct rewind_source *, const char *path, size_t len)
Definition: rewind_source.h:60
void(* traverse_files)(struct rewind_source *, process_file_callback_t callback)
Definition: rewind_source.h:29
void(* finish_fetch)(struct rewind_source *)
Definition: rewind_source.h:66
XLogRecPtr(* get_current_wal_insert_lsn)(struct rewind_source *)
Definition: rewind_source.h:71
void(* queue_fetch_range)(struct rewind_source *, const char *path, off_t offset, size_t len)
Definition: rewind_source.h:47
char *(* fetch_file)(struct rewind_source *, const char *path, size_t *filesize)
Definition: rewind_source.h:37
void(* destroy)(struct rewind_source *)
Definition: rewind_source.h:76

References libpq_source::common, conn, libpq_source::conn, rewind_source::destroy, rewind_source::fetch_file, rewind_source::finish_fetch, rewind_source::get_current_wal_insert_lsn, init_libpq_conn(), initStringInfo(), libpq_source::lengths, libpq_destroy(), libpq_fetch_file(), libpq_finish_fetch(), libpq_get_current_wal_insert_lsn(), libpq_queue_fetch_file(), libpq_queue_fetch_range(), libpq_traverse_files(), libpq_source::offsets, libpq_source::paths, pg_malloc0(), rewind_source::queue_fetch_file, rewind_source::queue_fetch_range, and rewind_source::traverse_files.

Referenced by main().

◆ libpq_destroy()

static void libpq_destroy ( rewind_source source)
static

Definition at line 674 of file libpq_source.c.

675{
677
678 pfree(src->paths.data);
679 pfree(src->offsets.data);
680 pfree(src->lengths.data);
681 pfree(src);
682
683 /* NOTE: we don't close the connection here, as it was not opened by us. */
684}
void pfree(void *pointer)
Definition: mcxt.c:1524
static rewind_source * source
Definition: pg_rewind.c:89

References StringInfoData::data, libpq_source::lengths, libpq_source::offsets, libpq_source::paths, pfree(), and source.

Referenced by init_libpq_source().

◆ libpq_fetch_file()

static char * libpq_fetch_file ( rewind_source source,
const char *  path,
size_t *  filesize 
)
static

Definition at line 634 of file libpq_source.c.

635{
636 PGconn *conn = ((libpq_source *) source)->conn;
637 PGresult *res;
638 char *result;
639 int len;
640 const char *paramValues[1];
641
642 paramValues[0] = path;
643 res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
644 1, NULL, paramValues, NULL, NULL, 1);
645
647 pg_fatal("could not fetch remote file \"%s\": %s",
648 path, PQresultErrorMessage(res));
649
650 /* sanity check the result set */
651 if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
652 pg_fatal("unexpected result set while fetching remote file \"%s\"",
653 path);
654
655 /* Read result to local variables */
656 len = PQgetlength(res, 0, 0);
657 result = pg_malloc(len + 1);
658 memcpy(result, PQgetvalue(res, 0, 0), len);
659 result[len] = '\0';
660
661 PQclear(res);
662
663 pg_log_debug("fetched file \"%s\", length %d", path, len);
664
665 if (filesize)
666 *filesize = len;
667 return result;
668}
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3887
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3901
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define pg_log_debug(...)
Definition: logging.h:133
const void size_t len

References conn, len, pg_fatal, pg_log_debug, pg_malloc(), PGRES_TUPLES_OK, PQclear(), PQexecParams(), PQgetisnull(), PQgetlength(), PQgetvalue(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), and source.

Referenced by init_libpq_source().

◆ libpq_finish_fetch()

static void libpq_finish_fetch ( rewind_source source)
static

Definition at line 420 of file libpq_source.c.

421{
423}
static void process_queued_fetch_requests(libpq_source *src)
Definition: libpq_source.c:426

References process_queued_fetch_requests(), and source.

Referenced by init_libpq_source().

◆ libpq_get_current_wal_insert_lsn()

static XLogRecPtr libpq_get_current_wal_insert_lsn ( rewind_source source)
static

Definition at line 208 of file libpq_source.c.

209{
210 PGconn *conn = ((libpq_source *) source)->conn;
211 XLogRecPtr result;
212 uint32 hi;
213 uint32 lo;
214 char *val;
215
216 val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
217
218 if (sscanf(val, "%X/%X", &hi, &lo) != 2)
219 pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
220
221 result = ((uint64) hi) << 32 | lo;
222
223 pg_free(val);
224
225 return result;
226}
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
long val
Definition: informix.c:689
uint64 XLogRecPtr
Definition: xlogdefs.h:21

References conn, pg_fatal, pg_free(), run_simple_query(), source, and val.

Referenced by init_libpq_source().

◆ libpq_queue_fetch_file()

static void libpq_queue_fetch_file ( rewind_source source,
const char *  path,
size_t  len 
)
static

Definition at line 325 of file libpq_source.c.

326{
327 /*
328 * Truncate the target file immediately, and queue a request to fetch it
329 * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
330 * request fetching a full-sized chunk anyway, so that if the file has
331 * become larger in the source system, after we scanned the source
332 * directory, we still fetch the whole file. This only works for files up
333 * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
334 * and such that are changed every now and then, but not WAL-logged. For
335 * larger files, we fetch up to the original size.
336 *
337 * Even with that mechanism, there is an inherent race condition if the
338 * file is modified at the same instant that we're copying it, so that we
339 * might copy a torn version of the file with one half from the old
340 * version and another half from the new. But pg_basebackup has the same
341 * problem, and it hasn't been a problem in practice.
342 *
343 * It might seem more natural to truncate the file later, when we receive
344 * it from the source server, but then we'd need to track which
345 * fetch-requests are for a whole file.
346 */
347 open_target_file(path, true);
349}
#define Max(x, y)
Definition: c.h:969
void open_target_file(const char *path, bool trunc)
Definition: file_ops.c:47
#define MAX_CHUNK_SIZE
Definition: libpq_source.c:25

References len, libpq_queue_fetch_range(), Max, MAX_CHUNK_SIZE, open_target_file(), and source.

Referenced by init_libpq_source().

◆ libpq_queue_fetch_range()

static void libpq_queue_fetch_range ( rewind_source source,
const char *  path,
off_t  off,
size_t  len 
)
static

Definition at line 355 of file libpq_source.c.

357{
359
360 /*
361 * Does this request happen to be a continuation of the previous chunk? If
362 * so, merge it with the previous one.
363 *
364 * XXX: We use pointer equality to compare the path. That's good enough
365 * for our purposes; the caller always passes the same pointer for the
366 * same filename. If it didn't, we would fail to merge requests, but it
367 * wouldn't affect correctness.
368 */
369 if (src->num_requests > 0)
370 {
371 fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
372
373 if (prev->offset + prev->length == off &&
374 prev->length < MAX_CHUNK_SIZE &&
375 prev->path == path)
376 {
377 /*
378 * Extend the previous request to cover as much of this new
379 * request as possible, without exceeding MAX_CHUNK_SIZE.
380 */
381 size_t thislen;
382
383 thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
384 prev->length += thislen;
385
386 off += thislen;
387 len -= thislen;
388
389 /*
390 * Fall through to create new requests for any remaining 'len'
391 * that didn't fit in the previous chunk.
392 */
393 }
394 }
395
396 /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
397 while (len > 0)
398 {
399 int32 thislen;
400
401 /* if the queue is full, perform all the work queued up so far */
404
405 thislen = Min(len, MAX_CHUNK_SIZE);
406 src->request_queue[src->num_requests].path = path;
407 src->request_queue[src->num_requests].offset = off;
408 src->request_queue[src->num_requests].length = thislen;
409 src->num_requests++;
410
411 off += thislen;
412 len -= thislen;
413 }
414}
#define Min(x, y)
Definition: c.h:975
int32_t int32
Definition: c.h:498
#define MAX_CHUNKS_PER_QUERY
Definition: libpq_source.c:26
const char * path
Definition: libpq_source.c:31
fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY]
Definition: libpq_source.c:47

References len, fetch_range_request::length, MAX_CHUNK_SIZE, MAX_CHUNKS_PER_QUERY, Min, libpq_source::num_requests, fetch_range_request::offset, fetch_range_request::path, process_queued_fetch_requests(), libpq_source::request_queue, and source.

Referenced by init_libpq_source(), and libpq_queue_fetch_file().

◆ libpq_traverse_files()

static void libpq_traverse_files ( rewind_source source,
process_file_callback_t  callback 
)
static

Definition at line 232 of file libpq_source.c.

233{
234 PGconn *conn = ((libpq_source *) source)->conn;
235 PGresult *res;
236 const char *sql;
237 int i;
238
239 /*
240 * Create a recursive directory listing of the whole data directory.
241 *
242 * The WITH RECURSIVE part does most of the work. The second part gets the
243 * targets of the symlinks in pg_tblspc directory.
244 *
245 * XXX: There is no backend function to get a symbolic link's target in
246 * general, so if the admin has put any custom symbolic links in the data
247 * directory, they won't be copied correctly.
248 */
249 sql =
250 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
251 " SELECT '' AS path, filename, size, isdir FROM\n"
252 " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
253 " pg_stat_file(fn.filename, true) AS this\n"
254 " UNION ALL\n"
255 " SELECT parent.path || parent.filename || '/' AS path,\n"
256 " fn, this.size, this.isdir\n"
257 " FROM files AS parent,\n"
258 " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
259 " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
260 " WHERE parent.isdir = 't'\n"
261 ")\n"
262 "SELECT path || filename, size, isdir,\n"
263 " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
264 "FROM files\n"
265 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
266 " AND oid::text = files.filename\n";
267 res = PQexec(conn, sql);
268
270 pg_fatal("could not fetch file list: %s",
272
273 /* sanity check the result set */
274 if (PQnfields(res) != 4)
275 pg_fatal("unexpected result set while fetching file list");
276
277 /* Read result to local variables */
278 for (i = 0; i < PQntuples(res); i++)
279 {
280 char *path;
281 int64 filesize;
282 bool isdir;
283 char *link_target;
285
286 if (PQgetisnull(res, i, 1))
287 {
288 /*
289 * The file was removed from the server while the query was
290 * running. Ignore it.
291 */
292 continue;
293 }
294
295 path = PQgetvalue(res, i, 0);
296 filesize = atoll(PQgetvalue(res, i, 1));
297 isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
298 link_target = PQgetvalue(res, i, 3);
299
300 if (link_target[0])
301 {
302 /*
303 * In-place tablespaces are directories located in pg_tblspc/ with
304 * relative paths.
305 */
306 if (is_absolute_path(link_target))
308 else
310 }
311 else if (isdir)
313 else
315
316 callback(path, type, filesize, link_target);
317 }
318 PQclear(res);
319}
int64_t int64
Definition: c.h:499
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
file_type_t
Definition: filemap.h:31
@ FILE_TYPE_REGULAR
Definition: filemap.h:34
@ FILE_TYPE_SYMLINK
Definition: filemap.h:36
@ FILE_TYPE_DIRECTORY
Definition: filemap.h:35
int i
Definition: isn.c:72
#define is_absolute_path(filename)
Definition: port.h:104
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46
const char * type

References callback(), conn, FILE_TYPE_DIRECTORY, FILE_TYPE_REGULAR, FILE_TYPE_SYMLINK, i, is_absolute_path, pg_fatal, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), source, and type.

Referenced by init_libpq_source().

◆ process_queued_fetch_requests()

static void process_queued_fetch_requests ( libpq_source src)
static

Definition at line 426 of file libpq_source.c.

427{
428 const char *params[3];
429 PGresult *res;
430 int chunkno;
431
432 if (src->num_requests == 0)
433 return;
434
435 pg_log_debug("getting %d file chunks", src->num_requests);
436
437 /*
438 * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
439 * the same length as parameters: paths, offsets and lengths. Construct
440 * the string representations of them.
441 */
442 resetStringInfo(&src->paths);
445
446 appendStringInfoChar(&src->paths, '{');
447 appendStringInfoChar(&src->offsets, '{');
448 appendStringInfoChar(&src->lengths, '{');
449 for (int i = 0; i < src->num_requests; i++)
450 {
452
453 if (i > 0)
454 {
455 appendStringInfoChar(&src->paths, ',');
456 appendStringInfoChar(&src->offsets, ',');
457 appendStringInfoChar(&src->lengths, ',');
458 }
459
463 }
464 appendStringInfoChar(&src->paths, '}');
465 appendStringInfoChar(&src->offsets, '}');
466 appendStringInfoChar(&src->lengths, '}');
467
468 /*
469 * Execute the prepared statement.
470 */
471 params[0] = src->paths.data;
472 params[1] = src->offsets.data;
473 params[2] = src->lengths.data;
474
475 if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
476 pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
477
478 if (PQsetSingleRowMode(src->conn) != 1)
479 pg_fatal("could not set libpq connection to single row mode");
480
481 /*----
482 * The result set is of format:
483 *
484 * path text -- path in the data directory, e.g "base/1/123"
485 * begin int8 -- offset within the file
486 * chunk bytea -- file content
487 *----
488 */
489 chunkno = 0;
490 while ((res = PQgetResult(src->conn)) != NULL)
491 {
492 fetch_range_request *rq = &src->request_queue[chunkno];
493 char *filename;
494 int filenamelen;
495 int64 chunkoff;
496 int chunksize;
497 char *chunk;
498
499 switch (PQresultStatus(res))
500 {
502 break;
503
504 case PGRES_TUPLES_OK:
505 PQclear(res);
506 continue; /* final zero-row result */
507
508 default:
509 pg_fatal("unexpected result while fetching remote files: %s",
511 }
512
513 if (chunkno > src->num_requests)
514 pg_fatal("received more data chunks than requested");
515
516 /* sanity check the result set */
517 if (PQnfields(res) != 3 || PQntuples(res) != 1)
518 pg_fatal("unexpected result set size while fetching remote files");
519
520 if (PQftype(res, 0) != TEXTOID ||
521 PQftype(res, 1) != INT8OID ||
522 PQftype(res, 2) != BYTEAOID)
523 {
524 pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
525 PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
526 }
527
528 if (PQfformat(res, 0) != 1 &&
529 PQfformat(res, 1) != 1 &&
530 PQfformat(res, 2) != 1)
531 {
532 pg_fatal("unexpected result format while fetching remote files");
533 }
534
535 if (PQgetisnull(res, 0, 0) ||
536 PQgetisnull(res, 0, 1))
537 {
538 pg_fatal("unexpected null values in result while fetching remote files");
539 }
540
541 if (PQgetlength(res, 0, 1) != sizeof(int64))
542 pg_fatal("unexpected result length while fetching remote files");
543
544 /* Read result set to local variables */
545 memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
546 chunkoff = pg_ntoh64(chunkoff);
547 chunksize = PQgetlength(res, 0, 2);
548
549 filenamelen = PQgetlength(res, 0, 0);
550 filename = pg_malloc(filenamelen + 1);
551 memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
552 filename[filenamelen] = '\0';
553
554 chunk = PQgetvalue(res, 0, 2);
555
556 /*
557 * If a file has been deleted on the source, remove it on the target
558 * as well. Note that multiple unlink() calls may happen on the same
559 * file if multiple data chunks are associated with it, hence ignore
560 * unconditionally anything missing.
561 */
562 if (PQgetisnull(res, 0, 2))
563 {
564 pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
565 filename);
567 }
568 else
569 {
570 pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
571 filename, (long long int) chunkoff, chunksize);
572
573 if (strcmp(filename, rq->path) != 0)
574 {
575 pg_fatal("received data for file \"%s\", when requested for \"%s\"",
576 filename, rq->path);
577 }
578 if (chunkoff != rq->offset)
579 pg_fatal("received data at offset %lld of file \"%s\", when requested for offset %lld",
580 (long long int) chunkoff, rq->path, (long long int) rq->offset);
581
582 /*
583 * We should not receive more data than we requested, or
584 * pg_read_binary_file() messed up. We could receive less,
585 * though, if the file was truncated in the source after we
586 * checked its size. That's OK, there should be a WAL record of
587 * the truncation, which will get replayed when you start the
588 * target system for the first time after pg_rewind has completed.
589 */
590 if (chunksize > rq->length)
591 pg_fatal("received more than requested for file \"%s\"", rq->path);
592
594
595 write_target_range(chunk, chunkoff, chunksize);
596 }
597
599
600 PQclear(res);
601 chunkno++;
602 }
603 if (chunkno != src->num_requests)
604 pg_fatal("unexpected number of data chunks received");
605
606 src->num_requests = 0;
607}
#define INT64_FORMAT
Definition: c.h:520
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7553
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
int PQfformat(const PGresult *res, int field_num)
Definition: fe-exec.c:3708
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
void remove_target_file(const char *path, bool missing_ok)
Definition: file_ops.c:187
void write_target_range(char *buf, off_t begin, size_t size)
Definition: file_ops.c:88
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:137
static void appendArrayEscapedString(StringInfo buf, const char *str)
Definition: libpq_source.c:613
#define pg_ntoh64(x)
Definition: pg_bswap.h:126
static char * filename
Definition: pg_dumpall.c:123
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242

References appendArrayEscapedString(), appendStringInfo(), appendStringInfoChar(), libpq_source::conn, StringInfoData::data, filename, i, INT64_FORMAT, fetch_range_request::length, libpq_source::lengths, libpq_source::num_requests, fetch_range_request::offset, libpq_source::offsets, open_target_file(), fetch_range_request::path, libpq_source::paths, pg_fatal, pg_free(), pg_log_debug, pg_malloc(), pg_ntoh64, PGRES_SINGLE_TUPLE, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQfformat(), PQftype(), PQgetisnull(), PQgetlength(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), PQsendQueryPrepared(), PQsetSingleRowMode(), remove_target_file(), libpq_source::request_queue, resetStringInfo(), and write_target_range().

Referenced by libpq_finish_fetch(), and libpq_queue_fetch_range().

◆ run_simple_command()

static void run_simple_command ( PGconn conn,
const char *  sql 
)
static

Definition at line 191 of file libpq_source.c.

192{
193 PGresult *res;
194
195 res = PQexec(conn, sql);
196
198 pg_fatal("error running query (%s) in source server: %s",
199 sql, PQresultErrorMessage(res));
200
201 PQclear(res);
202}

References conn, pg_fatal, PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultErrorMessage(), and PQresultStatus().

Referenced by init_libpq_conn().

◆ run_simple_query()

static char * run_simple_query ( PGconn conn,
const char *  sql 
)
static

Definition at line 163 of file libpq_source.c.

164{
165 PGresult *res;
166 char *result;
167
168 res = PQexec(conn, sql);
169
171 pg_fatal("error running query (%s) on source server: %s",
172 sql, PQresultErrorMessage(res));
173
174 /* sanity check the result set */
175 if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
176 pg_fatal("unexpected result set from query");
177
178 result = pg_strdup(PQgetvalue(res, 0, 0));
179
180 PQclear(res);
181
182 return result;
183}
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85

References conn, pg_fatal, pg_strdup(), PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetisnull(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), and PQresultStatus().

Referenced by init_libpq_conn(), and libpq_get_current_wal_insert_lsn().