PostgreSQL Source Code  git master
libpq_source.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq_source.c
4  * Functions for fetching files from a remote server via libpq.
5  *
6  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
7  *
8  *-------------------------------------------------------------------------
9  */
10 #include "postgres_fe.h"
11 
12 #include "catalog/pg_type_d.h"
13 #include "common/connect.h"
14 #include "datapagemap.h"
15 #include "file_ops.h"
16 #include "filemap.h"
17 #include "lib/stringinfo.h"
18 #include "pg_rewind.h"
19 #include "port/pg_bswap.h"
20 #include "rewind_source.h"
21 
22 /*
23  * Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
24  * maximum of MAX_CHUNKS_PER_QUERY chunks in a single query.
25  */
26 #define MAX_CHUNK_SIZE (1024 * 1024)
27 #define MAX_CHUNKS_PER_QUERY 1000
28 
29 /* represents a request to fetch a piece of a file from the source */
30 typedef struct
31 {
32  const char *path; /* path relative to data directory root */
33  off_t offset;
34  size_t length;
36 
37 typedef struct
38 {
39  rewind_source common; /* common interface functions */
40 
42 
43  /*
44  * Queue of chunks that have been requested with the queue_fetch_range()
45  * function, but have not been fetched from the remote server yet.
46  */
49 
50  /* temporary space for process_queued_fetch_requests() */
54 } libpq_source;
55 
56 static void init_libpq_conn(PGconn *conn);
57 static char *run_simple_query(PGconn *conn, const char *sql);
58 static void run_simple_command(PGconn *conn, const char *sql);
59 static void appendArrayEscapedString(StringInfo buf, const char *str);
60 
62 
63 /* public interface functions */
66 static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len);
67 static void libpq_queue_fetch_range(rewind_source *source, const char *path,
68  off_t off, size_t len);
70 static char *libpq_fetch_file(rewind_source *source, const char *path,
71  size_t *filesize);
73 static void libpq_destroy(rewind_source *source);
74 
75 /*
76  * Create a new libpq source.
77  *
78  * The caller has already established the connection, but should not try
79  * to use it while the source is active.
80  */
83 {
84  libpq_source *src;
85 
87 
88  src = pg_malloc0(sizeof(libpq_source));
89 
97 
98  src->conn = conn;
99 
100  initStringInfo(&src->paths);
101  initStringInfo(&src->offsets);
102  initStringInfo(&src->lengths);
103 
104  return &src->common;
105 }
106 
107 /*
108  * Initialize a libpq connection for use.
109  */
110 static void
112 {
113  PGresult *res;
114  char *str;
115 
116  /* disable all types of timeouts */
117  run_simple_command(conn, "SET statement_timeout = 0");
118  run_simple_command(conn, "SET lock_timeout = 0");
119  run_simple_command(conn, "SET idle_in_transaction_session_timeout = 0");
120  run_simple_command(conn, "SET transaction_timeout = 0");
121 
122  /*
123  * we don't intend to do any updates, put the connection in read-only mode
124  * to keep us honest
125  */
126  run_simple_command(conn, "SET default_transaction_read_only = on");
127 
128  /* secure search_path */
131  pg_fatal("could not clear search_path: %s",
133  PQclear(res);
134 
135  /*
136  * Also check that full_page_writes is enabled. We can get torn pages if
137  * a page is modified while we read it with pg_read_binary_file(), and we
138  * rely on full page images to fix them.
139  */
140  str = run_simple_query(conn, "SHOW full_page_writes");
141  if (strcmp(str, "on") != 0)
142  pg_fatal("full_page_writes must be enabled in the source server");
143  pg_free(str);
144 
145  /* Prepare a statement we'll use to fetch files */
146  res = PQprepare(conn, "fetch_chunks_stmt",
147  "SELECT path, begin,\n"
148  " pg_read_binary_file(path, begin, len, true) AS chunk\n"
149  "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
150  3, NULL);
151 
153  pg_fatal("could not prepare statement to fetch file contents: %s",
155  PQclear(res);
156 }
157 
158 /*
159  * Run a query that returns a single value.
160  *
161  * The result should be pg_free'd after use.
162  */
163 static char *
164 run_simple_query(PGconn *conn, const char *sql)
165 {
166  PGresult *res;
167  char *result;
168 
169  res = PQexec(conn, sql);
170 
172  pg_fatal("error running query (%s) on source server: %s",
173  sql, PQresultErrorMessage(res));
174 
175  /* sanity check the result set */
176  if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
177  pg_fatal("unexpected result set from query");
178 
179  result = pg_strdup(PQgetvalue(res, 0, 0));
180 
181  PQclear(res);
182 
183  return result;
184 }
185 
186 /*
187  * Run a command.
188  *
189  * In the event of a failure, exit immediately.
190  */
191 static void
192 run_simple_command(PGconn *conn, const char *sql)
193 {
194  PGresult *res;
195 
196  res = PQexec(conn, sql);
197 
199  pg_fatal("error running query (%s) in source server: %s",
200  sql, PQresultErrorMessage(res));
201 
202  PQclear(res);
203 }
204 
205 /*
206  * Call the pg_current_wal_insert_lsn() function in the remote system.
207  */
208 static XLogRecPtr
210 {
211  PGconn *conn = ((libpq_source *) source)->conn;
212  XLogRecPtr result;
213  uint32 hi;
214  uint32 lo;
215  char *val;
216 
217  val = run_simple_query(conn, "SELECT pg_current_wal_insert_lsn()");
218 
219  if (sscanf(val, "%X/%X", &hi, &lo) != 2)
220  pg_fatal("unrecognized result \"%s\" for current WAL insert location", val);
221 
222  result = ((uint64) hi) << 32 | lo;
223 
224  pg_free(val);
225 
226  return result;
227 }
228 
229 /*
230  * Get a list of all files in the data directory.
231  */
232 static void
234 {
235  PGconn *conn = ((libpq_source *) source)->conn;
236  PGresult *res;
237  const char *sql;
238  int i;
239 
240  /*
241  * Create a recursive directory listing of the whole data directory.
242  *
243  * The WITH RECURSIVE part does most of the work. The second part gets the
244  * targets of the symlinks in pg_tblspc directory.
245  *
246  * XXX: There is no backend function to get a symbolic link's target in
247  * general, so if the admin has put any custom symbolic links in the data
248  * directory, they won't be copied correctly.
249  */
250  sql =
251  "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
252  " SELECT '' AS path, filename, size, isdir FROM\n"
253  " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
254  " pg_stat_file(fn.filename, true) AS this\n"
255  " UNION ALL\n"
256  " SELECT parent.path || parent.filename || '/' AS path,\n"
257  " fn, this.size, this.isdir\n"
258  " FROM files AS parent,\n"
259  " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
260  " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
261  " WHERE parent.isdir = 't'\n"
262  ")\n"
263  "SELECT path || filename, size, isdir,\n"
264  " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
265  "FROM files\n"
266  "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
267  " AND oid::text = files.filename\n";
268  res = PQexec(conn, sql);
269 
271  pg_fatal("could not fetch file list: %s",
273 
274  /* sanity check the result set */
275  if (PQnfields(res) != 4)
276  pg_fatal("unexpected result set while fetching file list");
277 
278  /* Read result to local variables */
279  for (i = 0; i < PQntuples(res); i++)
280  {
281  char *path;
282  int64 filesize;
283  bool isdir;
284  char *link_target;
286 
287  if (PQgetisnull(res, i, 1))
288  {
289  /*
290  * The file was removed from the server while the query was
291  * running. Ignore it.
292  */
293  continue;
294  }
295 
296  path = PQgetvalue(res, i, 0);
297  filesize = atol(PQgetvalue(res, i, 1));
298  isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
299  link_target = PQgetvalue(res, i, 3);
300 
301  if (link_target[0])
302  {
303  /*
304  * In-place tablespaces are directories located in pg_tblspc/ with
305  * relative paths.
306  */
307  if (is_absolute_path(link_target))
309  else
311  }
312  else if (isdir)
314  else
316 
317  callback(path, type, filesize, link_target);
318  }
319  PQclear(res);
320 }
321 
322 /*
323  * Queue up a request to fetch a file from remote system.
324  */
325 static void
326 libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
327 {
328  /*
329  * Truncate the target file immediately, and queue a request to fetch it
330  * from the source. If the file is small, smaller than MAX_CHUNK_SIZE,
331  * request fetching a full-sized chunk anyway, so that if the file has
332  * become larger in the source system, after we scanned the source
333  * directory, we still fetch the whole file. This only works for files up
334  * to MAX_CHUNK_SIZE, but that's good enough for small configuration files
335  * and such that are changed every now and then, but not WAL-logged. For
336  * larger files, we fetch up to the original size.
337  *
338  * Even with that mechanism, there is an inherent race condition if the
339  * file is modified at the same instant that we're copying it, so that we
340  * might copy a torn version of the file with one half from the old
341  * version and another half from the new. But pg_basebackup has the same
342  * problem, and it hasn't been a problem in practice.
343  *
344  * It might seem more natural to truncate the file later, when we receive
345  * it from the source server, but then we'd need to track which
346  * fetch-requests are for a whole file.
347  */
348  open_target_file(path, true);
350 }
351 
352 /*
353  * Queue up a request to fetch a piece of a file from remote system.
354  */
355 static void
356 libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
357  size_t len)
358 {
359  libpq_source *src = (libpq_source *) source;
360 
361  /*
362  * Does this request happen to be a continuation of the previous chunk? If
363  * so, merge it with the previous one.
364  *
365  * XXX: We use pointer equality to compare the path. That's good enough
366  * for our purposes; the caller always passes the same pointer for the
367  * same filename. If it didn't, we would fail to merge requests, but it
368  * wouldn't affect correctness.
369  */
370  if (src->num_requests > 0)
371  {
372  fetch_range_request *prev = &src->request_queue[src->num_requests - 1];
373 
374  if (prev->offset + prev->length == off &&
375  prev->length < MAX_CHUNK_SIZE &&
376  prev->path == path)
377  {
378  /*
379  * Extend the previous request to cover as much of this new
380  * request as possible, without exceeding MAX_CHUNK_SIZE.
381  */
382  size_t thislen;
383 
384  thislen = Min(len, MAX_CHUNK_SIZE - prev->length);
385  prev->length += thislen;
386 
387  off += thislen;
388  len -= thislen;
389 
390  /*
391  * Fall through to create new requests for any remaining 'len'
392  * that didn't fit in the previous chunk.
393  */
394  }
395  }
396 
397  /* Divide the request into pieces of MAX_CHUNK_SIZE bytes each */
398  while (len > 0)
399  {
400  int32 thislen;
401 
402  /* if the queue is full, perform all the work queued up so far */
405 
406  thislen = Min(len, MAX_CHUNK_SIZE);
407  src->request_queue[src->num_requests].path = path;
408  src->request_queue[src->num_requests].offset = off;
409  src->request_queue[src->num_requests].length = thislen;
410  src->num_requests++;
411 
412  off += thislen;
413  len -= thislen;
414  }
415 }
416 
417 /*
418  * Fetch all the queued chunks and write them to the target data directory.
419  */
420 static void
422 {
424 }
425 
426 static void
428 {
429  const char *params[3];
430  PGresult *res;
431  int chunkno;
432 
433  if (src->num_requests == 0)
434  return;
435 
436  pg_log_debug("getting %d file chunks", src->num_requests);
437 
438  /*
439  * The prepared statement, 'fetch_chunks_stmt', takes three arrays with
440  * the same length as parameters: paths, offsets and lengths. Construct
441  * the string representations of them.
442  */
443  resetStringInfo(&src->paths);
444  resetStringInfo(&src->offsets);
445  resetStringInfo(&src->lengths);
446 
447  appendStringInfoChar(&src->paths, '{');
448  appendStringInfoChar(&src->offsets, '{');
449  appendStringInfoChar(&src->lengths, '{');
450  for (int i = 0; i < src->num_requests; i++)
451  {
452  fetch_range_request *rq = &src->request_queue[i];
453 
454  if (i > 0)
455  {
456  appendStringInfoChar(&src->paths, ',');
457  appendStringInfoChar(&src->offsets, ',');
458  appendStringInfoChar(&src->lengths, ',');
459  }
460 
461  appendArrayEscapedString(&src->paths, rq->path);
462  appendStringInfo(&src->offsets, INT64_FORMAT, (int64) rq->offset);
463  appendStringInfo(&src->lengths, INT64_FORMAT, (int64) rq->length);
464  }
465  appendStringInfoChar(&src->paths, '}');
466  appendStringInfoChar(&src->offsets, '}');
467  appendStringInfoChar(&src->lengths, '}');
468 
469  /*
470  * Execute the prepared statement.
471  */
472  params[0] = src->paths.data;
473  params[1] = src->offsets.data;
474  params[2] = src->lengths.data;
475 
476  if (PQsendQueryPrepared(src->conn, "fetch_chunks_stmt", 3, params, NULL, NULL, 1) != 1)
477  pg_fatal("could not send query: %s", PQerrorMessage(src->conn));
478 
479  if (PQsetSingleRowMode(src->conn) != 1)
480  pg_fatal("could not set libpq connection to single row mode");
481 
482  /*----
483  * The result set is of format:
484  *
485  * path text -- path in the data directory, e.g "base/1/123"
486  * begin int8 -- offset within the file
487  * chunk bytea -- file content
488  *----
489  */
490  chunkno = 0;
491  while ((res = PQgetResult(src->conn)) != NULL)
492  {
493  fetch_range_request *rq = &src->request_queue[chunkno];
494  char *filename;
495  int filenamelen;
496  int64 chunkoff;
497  int chunksize;
498  char *chunk;
499 
500  switch (PQresultStatus(res))
501  {
502  case PGRES_SINGLE_TUPLE:
503  break;
504 
505  case PGRES_TUPLES_OK:
506  PQclear(res);
507  continue; /* final zero-row result */
508 
509  default:
510  pg_fatal("unexpected result while fetching remote files: %s",
512  }
513 
514  if (chunkno > src->num_requests)
515  pg_fatal("received more data chunks than requested");
516 
517  /* sanity check the result set */
518  if (PQnfields(res) != 3 || PQntuples(res) != 1)
519  pg_fatal("unexpected result set size while fetching remote files");
520 
521  if (PQftype(res, 0) != TEXTOID ||
522  PQftype(res, 1) != INT8OID ||
523  PQftype(res, 2) != BYTEAOID)
524  {
525  pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u",
526  PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
527  }
528 
529  if (PQfformat(res, 0) != 1 &&
530  PQfformat(res, 1) != 1 &&
531  PQfformat(res, 2) != 1)
532  {
533  pg_fatal("unexpected result format while fetching remote files");
534  }
535 
536  if (PQgetisnull(res, 0, 0) ||
537  PQgetisnull(res, 0, 1))
538  {
539  pg_fatal("unexpected null values in result while fetching remote files");
540  }
541 
542  if (PQgetlength(res, 0, 1) != sizeof(int64))
543  pg_fatal("unexpected result length while fetching remote files");
544 
545  /* Read result set to local variables */
546  memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int64));
547  chunkoff = pg_ntoh64(chunkoff);
548  chunksize = PQgetlength(res, 0, 2);
549 
550  filenamelen = PQgetlength(res, 0, 0);
551  filename = pg_malloc(filenamelen + 1);
552  memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
553  filename[filenamelen] = '\0';
554 
555  chunk = PQgetvalue(res, 0, 2);
556 
557  /*
558  * If a file has been deleted on the source, remove it on the target
559  * as well. Note that multiple unlink() calls may happen on the same
560  * file if multiple data chunks are associated with it, hence ignore
561  * unconditionally anything missing.
562  */
563  if (PQgetisnull(res, 0, 2))
564  {
565  pg_log_debug("received null value for chunk for file \"%s\", file has been deleted",
566  filename);
568  }
569  else
570  {
571  pg_log_debug("received chunk for file \"%s\", offset %lld, size %d",
572  filename, (long long int) chunkoff, chunksize);
573 
574  if (strcmp(filename, rq->path) != 0)
575  {
576  pg_fatal("received data for file \"%s\", when requested for \"%s\"",
577  filename, rq->path);
578  }
579  if (chunkoff != rq->offset)
580  pg_fatal("received data at offset %lld of file \"%s\", when requested for offset %lld",
581  (long long int) chunkoff, rq->path, (long long int) rq->offset);
582 
583  /*
584  * We should not receive more data than we requested, or
585  * pg_read_binary_file() messed up. We could receive less,
586  * though, if the file was truncated in the source after we
587  * checked its size. That's OK, there should be a WAL record of
588  * the truncation, which will get replayed when you start the
589  * target system for the first time after pg_rewind has completed.
590  */
591  if (chunksize > rq->length)
592  pg_fatal("received more than requested for file \"%s\"", rq->path);
593 
594  open_target_file(filename, false);
595 
596  write_target_range(chunk, chunkoff, chunksize);
597  }
598 
599  pg_free(filename);
600 
601  PQclear(res);
602  chunkno++;
603  }
604  if (chunkno != src->num_requests)
605  pg_fatal("unexpected number of data chunks received");
606 
607  src->num_requests = 0;
608 }
609 
610 /*
611  * Escape a string to be used as element in a text array constant
612  */
613 static void
615 {
617  while (*str)
618  {
619  char ch = *str;
620 
621  if (ch == '"' || ch == '\\')
623 
625 
626  str++;
627  }
629 }
630 
631 /*
632  * Fetch a single file as a malloc'd buffer.
633  */
634 static char *
635 libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
636 {
637  PGconn *conn = ((libpq_source *) source)->conn;
638  PGresult *res;
639  char *result;
640  int len;
641  const char *paramValues[1];
642 
643  paramValues[0] = path;
644  res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
645  1, NULL, paramValues, NULL, NULL, 1);
646 
648  pg_fatal("could not fetch remote file \"%s\": %s",
649  path, PQresultErrorMessage(res));
650 
651  /* sanity check the result set */
652  if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
653  pg_fatal("unexpected result set while fetching remote file \"%s\"",
654  path);
655 
656  /* Read result to local variables */
657  len = PQgetlength(res, 0, 0);
658  result = pg_malloc(len + 1);
659  memcpy(result, PQgetvalue(res, 0, 0), len);
660  result[len] = '\0';
661 
662  PQclear(res);
663 
664  pg_log_debug("fetched file \"%s\", length %d", path, len);
665 
666  if (filesize)
667  *filesize = len;
668  return result;
669 }
670 
671 /*
672  * Close a libpq source.
673  */
674 static void
676 {
677  libpq_source *src = (libpq_source *) source;
678 
679  pfree(src->paths.data);
680  pfree(src->offsets.data);
681  pfree(src->lengths.data);
682  pfree(src);
683 
684  /* NOTE: we don't close the connection here, as it was not opened by us. */
685 }
unsigned int uint32
Definition: c.h:493
#define Min(x, y)
Definition: c.h:991
signed int int32
Definition: c.h:481
#define Max(x, y)
Definition: c.h:985
#define INT64_FORMAT
Definition: c.h:535
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6948
int PQgetlength(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3847
int PQsetSingleRowMode(PGconn *conn)
Definition: fe-exec.c:1932
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Definition: fe-exec.c:2268
Oid PQftype(const PGresult *res, int field_num)
Definition: fe-exec.c:3679
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:2238
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3371
char * PQresultErrorMessage(const PGresult *res)
Definition: fe-exec.c:3387
int PQfformat(const PGresult *res, int field_num)
Definition: fe-exec.c:3668
int PQntuples(const PGresult *res)
Definition: fe-exec.c:3441
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2224
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3836
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
Definition: fe-exec.c:3861
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1642
int PQnfields(const PGresult *res)
Definition: fe-exec.c:3449
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2038
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
void remove_target_file(const char *path, bool missing_ok)
Definition: file_ops.c:187
void open_target_file(const char *path, bool trunc)
Definition: file_ops.c:47
void write_target_range(char *buf, off_t begin, size_t size)
Definition: file_ops.c:88
void(* process_file_callback_t)(const char *path, file_type_t type, size_t size, const char *link_target)
Definition: file_ops.h:26
file_type_t
Definition: filemap.h:31
@ FILE_TYPE_REGULAR
Definition: filemap.h:34
@ FILE_TYPE_SYMLINK
Definition: filemap.h:36
@ FILE_TYPE_DIRECTORY
Definition: filemap.h:35
long val
Definition: informix.c:664
int i
Definition: isn.c:73
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
@ PGRES_SINGLE_TUPLE
Definition: libpq-fe.h:113
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:103
static void init_libpq_conn(PGconn *conn)
Definition: libpq_source.c:111
static char * libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
Definition: libpq_source.c:635
#define MAX_CHUNK_SIZE
Definition: libpq_source.c:26
static void appendArrayEscapedString(StringInfo buf, const char *str)
Definition: libpq_source.c:614
static void run_simple_command(PGconn *conn, const char *sql)
Definition: libpq_source.c:192
rewind_source * init_libpq_source(PGconn *conn)
Definition: libpq_source.c:82
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source)
Definition: libpq_source.c:209
static void libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
Definition: libpq_source.c:233
static void libpq_destroy(rewind_source *source)
Definition: libpq_source.c:675
static void libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len)
Definition: libpq_source.c:356
#define MAX_CHUNKS_PER_QUERY
Definition: libpq_source.c:27
static char * run_simple_query(PGconn *conn, const char *sql)
Definition: libpq_source.c:164
static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
Definition: libpq_source.c:326
static void libpq_finish_fetch(rewind_source *source)
Definition: libpq_source.c:421
static void process_queued_fetch_requests(libpq_source *src)
Definition: libpq_source.c:427
#define pg_log_debug(...)
Definition: logging.h:133
void pfree(void *pointer)
Definition: mcxt.c:1508
#define pg_fatal(...)
#define pg_ntoh64(x)
Definition: pg_bswap.h:126
const void size_t len
static char * filename
Definition: pg_dumpall.c:121
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:73
#define is_absolute_path(filename)
Definition: port.h:103
PGconn * conn
Definition: streamutil.c:54
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:78
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:97
void appendStringInfoChar(StringInfo str, char ch)
Definition: stringinfo.c:194
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
#define appendStringInfoCharMacro(str, ch)
Definition: stringinfo.h:204
const char * path
Definition: libpq_source.c:32
StringInfoData paths
Definition: libpq_source.c:51
fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY]
Definition: libpq_source.c:48
StringInfoData lengths
Definition: libpq_source.c:53
PGconn * conn
Definition: libpq_source.c:41
StringInfoData offsets
Definition: libpq_source.c:52
rewind_source common
Definition: libpq_source.c:39
void(* queue_fetch_file)(struct rewind_source *, const char *path, size_t len)
Definition: rewind_source.h:60
void(* traverse_files)(struct rewind_source *, process_file_callback_t callback)
Definition: rewind_source.h:29
void(* finish_fetch)(struct rewind_source *)
Definition: rewind_source.h:66
XLogRecPtr(* get_current_wal_insert_lsn)(struct rewind_source *)
Definition: rewind_source.h:71
void(* queue_fetch_range)(struct rewind_source *, const char *path, off_t offset, size_t len)
Definition: rewind_source.h:47
char *(* fetch_file)(struct rewind_source *, const char *path, size_t *filesize)
Definition: rewind_source.h:37
void(* destroy)(struct rewind_source *)
Definition: rewind_source.h:76
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46
const char * type
uint64 XLogRecPtr
Definition: xlogdefs.h:21