PostgreSQL Source Code git master
libpq_source.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpq_source.c
4 * Functions for fetching files from a remote server via libpq.
5 *
6 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 *
8 *-------------------------------------------------------------------------
9 */
10#include "postgres_fe.h"
11
12#include "catalog/pg_type_d.h"
13#include "common/connect.h"
14#include "file_ops.h"
15#include "filemap.h"
16#include "lib/stringinfo.h"
17#include "pg_rewind.h"
18#include "port/pg_bswap.h"
19#include "rewind_source.h"
20
21/*
22 * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
23 * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
24 */
25#define MAX_CHUNK_SIZE (1024 * 1024)
26#define MAX_CHUNKS_PER_QUERY 1000
27
28/* represents a request to fetch a piece of a file from the source */
29typedef struct
30{
31 const char *path; /* path relative to data directory root */
32 off_t offset;
33 size_t length;
35
36typedef struct
37{
38 rewind_source common; /* common interface functions */
39
41
42 /*
43 * Queue of chunks that have been requested with the queue_fetch_range()
44 * function, but have not been fetched from the remote server yet.
45 */
48
49 /* temporary space for process_queued_fetch_requests() */
54
55static void init_libpq_conn(PGconn *conn);
56static char *run_simple_query(PGconn *conn, const char *sql);
57static void run_simple_command(PGconn *conn, const char *sql);
58static void appendArrayEscapedString(StringInfo buf, const char *str);
59
61
62/* public interface functions */
65static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len);
66static void libpq_queue_fetch_range(rewind_source *source, const char *path,
67 off_t off, size_t len);
69static char *libpq_fetch_file(rewind_source *source, const char *path,
70 size_t *filesize);
73
74/*
75 * Create a new libpq source.
76 *
77 * The caller has already established the connection, but should not try
78 * to use it while the source is active.
79 */
82{
83 libpq_source *src;
84
86
87 src = pg_malloc0(sizeof(libpq_source));
88
96
97 src->conn = conn;
98
99 initStringInfo(&src->paths);
100 initStringInfo(&src->offsets);
101 initStringInfo(&src->lengths);
102
103 return &src->common;
104}
105
106/*
107 * Initialize a libpq connection for use.
108 */
109static void
111{
112 PGresult *res;
113 char *str;
114
115 /* disable all types of timeouts */
116 run_simple_command(conn, "SET statement_timeout = 0");
117 run_simple_command(conn, "SET lock_timeout = 0");
118 run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
119 run_simple_command(conn, "SET transaction_timeout = 0");
120
121 /*
122 * we don't intend to do any updates, put the connection in read-only mode
123 * to keep us honest
124 */
125 run_simple_command(conn, "SET default_transaction_read_only = on");
126
127 /* secure search_path */
130 pg_fatal("could not clear \"search_path\": %s",
132 PQclear(res);
133
134 /*
135 * Also check that full_page_writes is enabled. We can get torn pages if
136 * a page is modified while we read it with pg_read_binary_file(), and we
137 * rely on full page images to fix them.
138 */
139 str = run_simple_query(conn, "SHOW full_page_writes");
140 if (strcmp(str, "on") != 0)
141 pg_fatal("\"full_page_writes\" must be enabled in the source server");
142 pg_free(str);
143
144 /* Prepare a statement we'll use to fetch files */
145 res = PQprepare(conn, "fetch_chunks_stmt",
146 "SELECT path, begin,\n"
147 " pg_read_binary_file(path, begin, len, true) AS chunk\n"
148 "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
149 3, NULL);
150
152 pg_fatal("could not prepare statement to fetch file contents: %s",
154 PQclear(res);
155}
156
157/*
158 * Run a query that returns a single value.
159 *
160 * The result should be pg_free'd after use.
161 */
162static char *
163run_simple_query(PGconn *conn, const char *sql)
164{
165 PGresult *res;
166 char *result;
167
168 res = PQexec(conn, sql);
169
171 pg_fatal("error running query (%s) on source server: %s",
173
174 /* sanity check the result set */
175 if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
176 pg_fatal("unexpected result set from query");
177
178 result = pg_strdup(PQgetvalue(res, 0, 0));
179
180 PQclear(res);
181
182 return result;
183}
184
185/*
186 * Run a command.
187 *
188 * In the event of a failure, exit immediately.
189 */
190static void
192{
193 PGresult *res;
194
195 res = PQexec(conn, sql);
196
198 pg_fatal("error running query (%s) in source server: %s",
200
201 PQclear(res);
202}
203
204/*
205 * Call the pg_current_wal_insert_lsn() function in the remote system.
206 */
207static XLogRecPtr
209{
210 PGconn *conn = ((libpq_source *) source)->conn;
211 XLogRecPtr result;
212 uint32 hi;
213 uint32 lo;
214 char *val;
215
216 val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
217
218 if (sscanf(val, "%X/%X", &hi, &lo) != 2)
219 pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
220
221 result = ((uint64) hi) << 32 | lo;
222
223 pg_free(val);
224
225 return result;
226}
227
228/*
229 * Get a list of all files in the data directory.
230 */
231static void
233{
234 PGconn *conn = ((libpq_source *) source)->conn;
235 PGresult *res;
236 const char *sql;
237 int i;
238
239 /*
240 * Create a recursive directory listing of the whole data directory.
241 *
242 * The WITH RECURSIVE part does most of the work. The second part gets the
243 * targets of the symlinks in pg_tblspc directory.
244 *
245 * XXX: There is no backend function to get a symbolic link's target in
246 * general, so if the admin has put any custom symbolic links in the data
247 * directory, they won't be copied correctly.
248 */
249 sql =
250 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
251 " SELECT '' AS path, filename, size, isdir FROM\n"
252 " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
253 " pg_stat_file(fn.filename, true) AS this\n"
254 " UNION ALL\n"
255 " SELECT parent.path || parent.filename || '/' AS path,\n"
256 " fn, this.size, this.isdir\n"
257 " FROM files AS parent,\n"
258 " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
259 " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
260 " WHERE parent.isdir = 't'\n"
261 ")\n"
262 "SELECT path || filename, size, isdir,\n"
263 " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
264 "FROM files\n"
265 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
266 " AND oid::text = files.filename\n";
267 res = PQexec(conn, sql);
268
270 pg_fatal("could not fetch file list: %s",
272
273 /* sanity check the result set */
274 if (PQnfields(res) != 4)
275 pg_fatal("unexpected result set while fetching file list");
276
277 /* Read result to local variables */
278 for (i = 0; i < PQntuples(res); i++)
279 {
280 char *path;
281 int64 filesize;
282 bool isdir;
283 char *link_target;
285
286 if (PQgetisnull(res, i, 1))
287 {
288 /*
289 * The file was removed from the server while the query was
290 * running. Ignore it.
291 */
292 continue;
293 }
294
295 path = PQgetvalue(res, i, 0);
296 filesize = atoll(PQgetvalue(res, i, 1));
297 isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
298 link_target = PQgetvalue(res, i, 3);
299
300 if (link_target[0])
301 {
302 /*
303 * In-place tablespaces are directories located in pg_tblspc/ with
304 * relative paths.
305 */
306 if (is_absolute_path(link_target))
308 else
310 }
311 else if (isdir)
313 else
315
316 callback(path, type, filesize, link_target);
317 }
318 PQclear(res);
319}
320
321/*
322 * Queue up a request to fetch a file from remote system.
323 */
324static void
326{
327 /*
328 * Truncate the target file immediately, and queue a request to fetch it
329 * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
330 * request fetching a full-sized chunk anyway, so that if the file has
331 * become larger in the source system, after we scanned the source
332 * directory, we still fetch the whole file. This only works for files up
333 * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
334 * and such that are changed every now and then, but not WAL-logged. For
335 * larger files, we fetch up to the original size.
336 *
337 * Even with that mechanism, there is an inherent race condition if the
338 * file is modified at the same instant that we're copying it, so that we
339 * might copy a torn version of the file with one half from the old
340 * version and another half from the new. But pg_basebackup has the same
341 * problem, and it hasn't been a problem in practice.
342 *
343 * It might seem more natural to truncate the file later, when we receive
344 * it from the source server, but then we'd need to track which
345 * fetch-requests are for a whole file.
346 */
347 open_target_file(path, true);
349}
350
351/*
352 * Queue up a request to fetch a piece of a file from remote system.
353 */
354static void
355libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
356 size_t len)
357{
359
360 /*
361 * Does this request happen to be a continuation of the previous chunk? If
362 * so, merge it with the previous one.
363 *
364 * XXX: We use pointer equality to compare the path. That's good enough
365 * for our purposes; the caller always passes the same pointer for the
366 * same filename. If it didn't, we would fail to merge requests, but it
367 * wouldn't affect correctness.
368 */
369 if (src->num_requests > 0)
370 {
371 fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
372
373 if (prev->offset + prev->length == off &&
374 prev->length < MAX_CHUNK_SIZE &&
375 prev->path == path)
376 {
377 /*
378 * Extend the previous request to cover as much of this new
379 * request as possible, without exceeding MAX_CHUNK_SIZE.
380 */
381 size_t thislen;
382
383 thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
384 prev->length += thislen;
385
386 off += thislen;
387 len -= thislen;
388
389 /*
390 * Fall through to create new requests for any remaining 'len'
391 * that didn't fit in the previous chunk.
392 */
393 }
394 }
395
396 /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
397 while (len > 0)
398 {
399 int32 thislen;
400
401 /* if the queue is full, perform all the work queued up so far */
404
405 thislen = Min(len, MAX_CHUNK_SIZE);
406 src->request_queue[src->num_requests].path = path;
407 src->request_queue[src->num_requests].offset = off;
408 src->request_queue[src->num_requests].length = thislen;
409 src->num_requests++;
410
411 off += thislen;
412 len -= thislen;
413 }
414}
415
416/*
417 * Fetch all the queued chunks and write them to the target data directory.
418 */
419static void
421{
423}
424
425static void
427{
428 const char *params[3];
429 PGresult *res;
430 int chunkno;
431
432 if (src->num_requests == 0)
433 return;
434
435 pg_log_debug("getting %d file chunks", src->num_requests);
436
437 /*
438 * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
439 * the same length as parameters: paths, offsets and lengths. Construct
440 * the string representations of them.
441 */
442 resetStringInfo(&src->paths);
445
446 appendStringInfoChar(&src->paths, '{');
447 appendStringInfoChar(&src->offsets, '{');
448 appendStringInfoChar(&src->lengths, '{');
449 for (int i = 0; i < src->num_requests; i++)
450 {
452
453 if (i > 0)
454 {
455 appendStringInfoChar(&src->paths, ',');
456 appendStringInfoChar(&src->offsets, ',');
457 appendStringInfoChar(&src->lengths, ',');
458 }
459
463 }
464 appendStringInfoChar(&src->paths, '}');
465 appendStringInfoChar(&src->offsets, '}');
466 appendStringInfoChar(&src->lengths, '}');
467
468 /*
469 * Execute the prepared statement.
470 */
471 params[0] = src->paths.data;
472 params[1] = src->offsets.data;
473 params[2] = src->lengths.data;
474
475 if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
476 pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
477
478 if (PQsetSingleRowMode(src->conn) != 1)
479 pg_fatal("could not set libpq connection to single row mode");
480
481 /*----
482 * The result set is of format:
483 *
484 * path text -- path in the data directory, e.g "base/1/123"
485 * begin int8 -- offset within the file
486 * chunk bytea -- file content
487 *----
488 */
489 chunkno = 0;
490 while ((res = PQgetResult(src->conn)) != NULL)
491 {
492 fetch_range_request *rq = &src->request_queue[chunkno];
493 char *filename;
494 int filenamelen;
495 int64 chunkoff;
496 int chunksize;
497 char *chunk;
498
499 switch (PQresultStatus(res))
500 {
502 break;
503
504 case PGRES_TUPLES_OK:
505 PQclear(res);
506 continue; /* final zero-row result */
507
508 default:
509 pg_fatal("unexpected result while fetching remote files: %s",
511 }
512
513 if (chunkno > src->num_requests)
514 pg_fatal("received more data chunks than requested");
515
516 /* sanity check the result set */
517 if (PQnfields(res) != 3 || PQntuples(res) != 1)
518 pg_fatal("unexpected result set size while fetching remote files");
519
520 if (PQftype(res, 0) != TEXTOID ||
521 PQftype(res, 1) != INT8OID ||
522 PQftype(res, 2) != BYTEAOID)
523 {
524 pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
525 PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
526 }
527
528 if (PQfformat(res, 0) != 1 &&
529 PQfformat(res, 1) != 1 &&
530 PQfformat(res, 2) != 1)
531 {
532 pg_fatal("unexpected result format while fetching remote files");
533 }
534
535 if (PQgetisnull(res, 0, 0) ||
536 PQgetisnull(res, 0, 1))
537 {
538 pg_fatal("unexpected null values in result while fetching remote files");
539 }
540
541 if (PQgetlength(res, 0, 1) != sizeof(int64))
542 pg_fatal("unexpected result length while fetching remote files");
543
544 /* Read result set to local variables */
545 memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
546 chunkoff = pg_ntoh64(chunkoff);
547 chunksize = PQgetlength(res, 0, 2);
548
549 filenamelen = PQgetlength(res, 0, 0);
550 filename = pg_malloc(filenamelen + 1);
551 memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
552 filename[filenamelen] = '\0';
553
554 chunk = PQgetvalue(res, 0, 2);
555
556 /*
557 * If a file has been deleted on the source, remove it on the target
558 * as well. Note that multiple unlink() calls may happen on the same
559 * file if multiple data chunks are associated with it, hence ignore
560 * unconditionally anything missing.
561 */
562 if (PQgetisnull(res, 0, 2))
563 {
564 pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
565 filename);
567 }
568 else
569 {
570 pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
571 filename, (long long int) chunkoff, chunksize);
572
573 if (strcmp(filename, rq->path) != 0)
574 {
575 pg_fatal("received data for file \"%s\", when requested for \"%s\"",
576 filename, rq->path);
577 }
578 if (chunkoff != rq->offset)
579 pg_fatal("received data at offset %lld of file \"%s\", when requested for offset %lld",
580 (long long int) chunkoff, rq->path, (long long int) rq->offset);
581
582 /*
583 * We should not receive more data than we requested, or
584 * pg_read_binary_file() messed up. We could receive less,
585 * though, if the file was truncated in the source after we
586 * checked its size. That's OK, there should be a WAL record of
587 * the truncation, which will get replayed when you start the
588 * target system for the first time after pg_rewind has completed.
589 */
590 if (chunksize > rq->length)
591 pg_fatal("received more than requested for file \"%s\"", rq->path);
592
594
595 write_target_range(chunk, chunkoff, chunksize);
596 }
597
599
600 PQclear(res);
601 chunkno++;
602 }
603 if (chunkno != src->num_requests)
604 pg_fatal("unexpected number of data chunks received");
605
606 src->num_requests = 0;
607}
608
609/*
610 * Escape a string to be used as element in a text array constant
611 */
612static void
614{
616 while (*str)
617 {
618 char ch = *str;
619
620 if (ch == '"' || ch == '\\')
622
624
625 str++;
626 }
628}
629
630/*
631 * Fetch a single file as a malloc'd buffer.
632 */
633static char *
634libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
635{
636 PGconn *conn = ((libpq_source *) source)->conn;
637 PGresult *res;
638 char *result;
639 int len;
640 const char *paramValues[1];
641
642 paramValues[0] = path;
643 res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
644 1, NULL, paramValues, NULL, NULL, 1);
645
647 pg_fatal("could not fetch remote file \"%s\": %s",
649
650 /* sanity check the result set */
651 if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
652 pg_fatal("unexpected result set while fetching remote file \"%s\"",
653 path);
654
655 /* Read result to local variables */
656 len = PQgetlength(res, 0, 0);
657 result = pg_malloc(len + 1);
658 memcpy(result, PQgetvalue(res, 0, 0), len);
659 result[len] = '\0';
660
661 PQclear(res);
662
663 pg_log_debug("fetched file \"%s\", length %d", path, len);
664
665 if (filesize)
666 *filesize = len;
667 return result;
668}
669
670/*
671 * Close a libpq source.
672 */
673static void
675{
677
678 pfree(src->paths.data);
679 pfree(src->offsets.data);
680 pfree(src->lengths.data);
681 pfree(src);
682
683 /* NOTE: we don't close the connection here, as it was not opened by us. */
684}
#define Min(x, y)
Definition: c.h:961
#define Max(x, y)
Definition: c.h:955
#define INT64_FORMAT
Definition: c.h:506
int64_t int64
Definition: c.h:485
int32_t int32
Definition: c.h:484
uint64_t uint64
Definition: c.h:489
uint32_t uint32
Definition: c.h:488
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7268
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3887
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1948
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3719
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2276
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3876
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQfformat(const PGresult *res, int field_num)
Definition: fe-exec.c:3708
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3481
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2306
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3427
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3901
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2262
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1633
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3489
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void remove_target_file(const char *path, bool missing_ok)
Definition: file_ops.c:187
void open_target_file(const char *path, bool trunc)
Definition: file_ops.c:47
void write_target_range(char *buf, off_t begin, size_t size)
Definition: file_ops.c:88
void(* process_file_callback_t)(const char *path, file_type_t type, size_t size, const char *link_target)
Definition: file_ops.h:26
file_type_t
Definition: filemap.h:31
@ FILE_TYPE_REGULAR
Definition: filemap.h:34
@ FILE_TYPE_SYMLINK
Definition: filemap.h:36
@ FILE_TYPE_DIRECTORY
Definition: filemap.h:35
uint64 chunk
const char * str
long val
Definition: informix.c:689
int i
Definition: isn.c:72
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:133
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
static void init_libpq_conn(PGconn *conn)
Definition: libpq_source.c:110
static char * libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
Definition: libpq_source.c:634
#define MAX_CHUNK_SIZE
Definition: libpq_source.c:25
static void appendArrayEscapedString(StringInfo buf, const char *str)
Definition: libpq_source.c:613
static void run_simple_command(PGconn *conn, const char *sql)
Definition: libpq_source.c:191
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source)
Definition: libpq_source.c:208
rewind_source * init_libpq_source(PGconn *conn)
Definition: libpq_source.c:81
static void libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
Definition: libpq_source.c:232
static void libpq_destroy(rewind_source *source)
Definition: libpq_source.c:674
static void libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len)
Definition: libpq_source.c:355
#define MAX_CHUNKS_PER_QUERY
Definition: libpq_source.c:26
static char * run_simple_query(PGconn *conn, const char *sql)
Definition: libpq_source.c:163
static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
Definition: libpq_source.c:325
static void libpq_finish_fetch(rewind_source *source)
Definition: libpq_source.c:420
static void process_queued_fetch_requests(libpq_source *src)
Definition: libpq_source.c:426
#define pg_log_debug(...)
Definition: logging.h:133
void pfree(void *pointer)
Definition: mcxt.c:1521
#define pg_fatal(...)
#define pg_ntoh64(x)
Definition: pg_bswap.h:126
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:72
#define is_absolute_path(filename)
Definition: port.h:103
PGconn * conn
Definition: streamutil.c:53
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:242
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:231
const char * path
Definition: libpq_source.c:31
StringInfoData paths
Definition: libpq_source.c:50
fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY]
Definition: libpq_source.c:47
StringInfoData lengths
Definition: libpq_source.c:52
PGconn * conn
Definition: libpq_source.c:40
StringInfoData offsets
Definition: libpq_source.c:51
rewind_source common
Definition: libpq_source.c:38
void(* queue_fetch_file)(struct rewind_source *, const char *path, size_t len)
Definition: rewind_source.h:60
void(* traverse_files)(struct rewind_source *, process_file_callback_t callback)
Definition: rewind_source.h:29
void(* finish_fetch)(struct rewind_source *)
Definition: rewind_source.h:66
XLogRecPtr(* get_current_wal_insert_lsn)(struct rewind_source *)
Definition: rewind_source.h:71
void(* queue_fetch_range)(struct rewind_source *, const char *path, off_t offset, size_t len)
Definition: rewind_source.h:47
char *(* fetch_file)(struct rewind_source *, const char *path, size_t *filesize)
Definition: rewind_source.h:37
void(* destroy)(struct rewind_source *)
Definition: rewind_source.h:76
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46
const char * type
uint64 XLogRecPtr
Definition: xlogdefs.h:21