12 #include "catalog/pg_type_d.h"
26 #define MAX_CHUNK_SIZE (1024 * 1024)
27 #define MAX_CHUNKS_PER_QUERY 1000
68 off_t off,
size_t len);
131 pg_fatal(
"could not clear \"search_path\": %s",
141 if (strcmp(
str,
"on") != 0)
142 pg_fatal(
"\"full_page_writes\" must be enabled in the source server");
147 "SELECT path, begin,\n"
148 " pg_read_binary_file(path, begin, len, true) AS chunk\n"
149 "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
153 pg_fatal(
"could not prepare statement to fetch file contents: %s",
172 pg_fatal(
"error running query (%s) on source server: %s",
177 pg_fatal(
"unexpected result set from query");
199 pg_fatal(
"error running query (%s) in source server: %s",
219 if (sscanf(
val,
"%X/%X", &hi, &lo) != 2)
220 pg_fatal(
"unrecognized result \"%s\" for current WAL insert location",
val);
222 result = ((uint64) hi) << 32 | lo;
251 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
252 " SELECT '' AS path, filename, size, isdir FROM\n"
253 " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
254 " pg_stat_file(fn.filename, true) AS this\n"
256 " SELECT parent.path || parent.filename || '/' AS path,\n"
257 " fn, this.size, this.isdir\n"
258 " FROM files AS parent,\n"
259 " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
260 " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
261 " WHERE parent.isdir = 't'\n"
263 "SELECT path || filename, size, isdir,\n"
264 " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
266 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
267 " AND oid::text = files.filename\n";
271 pg_fatal(
"could not fetch file list: %s",
276 pg_fatal(
"unexpected result set while fetching file list");
429 const char *params[3];
480 pg_fatal(
"could not set libpq connection to single row mode");
510 pg_fatal(
"unexpected result while fetching remote files: %s",
515 pg_fatal(
"received more data chunks than requested");
519 pg_fatal(
"unexpected result set size while fetching remote files");
525 pg_fatal(
"unexpected data types in result set while fetching remote files: %u %u %u",
533 pg_fatal(
"unexpected result format while fetching remote files");
539 pg_fatal(
"unexpected null values in result while fetching remote files");
543 pg_fatal(
"unexpected result length while fetching remote files");
565 pg_log_debug(
"received null value for chunk for file \"%s\", file has been deleted",
571 pg_log_debug(
"received chunk for file \"%s\", offset %lld, size %d",
572 filename, (
long long int) chunkoff, chunksize);
576 pg_fatal(
"received data for file \"%s\", when requested for \"%s\"",
579 if (chunkoff != rq->
offset)
580 pg_fatal(
"received data at offset %lld of file \"%s\", when requested for offset %lld",
581 (
long long int) chunkoff, rq->
path, (
long long int) rq->
offset);
591 if (chunksize > rq->
length)
592 pg_fatal(
"received more than requested for file \"%s\"", rq->
path);
605 pg_fatal(
"unexpected number of data chunks received");
621 if (ch ==
'"' || ch ==
'\\')
641 const char *paramValues[1];
643 paramValues[0] = path;
645 1, NULL, paramValues, NULL, NULL, 1);
648 pg_fatal(
"could not fetch remote file \"%s\": %s",
653 pg_fatal(
"unexpected result set while fetching remote file \"%s\"",
#define ALWAYS_SECURE_SEARCH_PATH_SQL
static void PGresult * res
char * PQerrorMessage(const PGconn *conn)
int PQgetlength(const PGresult *res, int tup_num, int field_num)
int PQsetSingleRowMode(PGconn *conn)
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
Oid PQftype(const PGresult *res, int field_num)
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
ExecStatusType PQresultStatus(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
int PQfformat(const PGresult *res, int field_num)
int PQntuples(const PGresult *res)
PGresult * PQexec(PGconn *conn, const char *query)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQnfields(const PGresult *res)
PGresult * PQgetResult(PGconn *conn)
void * pg_malloc0(size_t size)
char * pg_strdup(const char *in)
void * pg_malloc(size_t size)
void remove_target_file(const char *path, bool missing_ok)
void open_target_file(const char *path, bool trunc)
void write_target_range(char *buf, off_t begin, size_t size)
void(* process_file_callback_t)(const char *path, file_type_t type, size_t size, const char *link_target)
static void init_libpq_conn(PGconn *conn)
static char * libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
static void appendArrayEscapedString(StringInfo buf, const char *str)
static void run_simple_command(PGconn *conn, const char *sql)
rewind_source * init_libpq_source(PGconn *conn)
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source)
static void libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
static void libpq_destroy(rewind_source *source)
static void libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len)
#define MAX_CHUNKS_PER_QUERY
static char * run_simple_query(PGconn *conn, const char *sql)
static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
static void libpq_finish_fetch(rewind_source *source)
static void process_queued_fetch_requests(libpq_source *src)
#define pg_log_debug(...)
void pfree(void *pointer)
static rewind_source * source
#define is_absolute_path(filename)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
#define appendStringInfoCharMacro(str, ch)
fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY]
void(* queue_fetch_file)(struct rewind_source *, const char *path, size_t len)
void(* traverse_files)(struct rewind_source *, process_file_callback_t callback)
void(* finish_fetch)(struct rewind_source *)
XLogRecPtr(* get_current_wal_insert_lsn)(struct rewind_source *)
void(* queue_fetch_range)(struct rewind_source *, const char *path, off_t offset, size_t len)
char *(* fetch_file)(struct rewind_source *, const char *path, size_t *filesize)
void(* destroy)(struct rewind_source *)
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)