12#include "catalog/pg_type_d.h"
25#define MAX_CHUNK_SIZE (1024 * 1024)
26#define MAX_CHUNKS_PER_QUERY 1000
67 off_t off,
size_t len);
130 pg_fatal(
"could not clear \"search_path\": %s",
140 if (strcmp(
str,
"on") != 0)
141 pg_fatal(
"\"full_page_writes\" must be enabled in the source server");
146 "SELECT path, begin,\n"
147 " pg_read_binary_file(path, begin, len, true) AS chunk\n"
148 "FROM unnest ($1::text[], $2::int8[], $3::int4[]) as x(path, begin, len)",
152 pg_fatal(
"could not prepare statement to fetch file contents: %s",
171 pg_fatal(
"error running query (%s) on source server: %s",
176 pg_fatal(
"unexpected result set from query");
198 pg_fatal(
"error running query (%s) in source server: %s",
218 if (sscanf(
val,
"%X/%X", &hi, &lo) != 2)
219 pg_fatal(
"unrecognized result \"%s\" for current WAL insert location",
val);
221 result = ((
uint64) hi) << 32 | lo;
250 "WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
251 " SELECT '' AS path, filename, size, isdir FROM\n"
252 " (SELECT pg_ls_dir('.', true, false) AS filename) AS fn,\n"
253 " pg_stat_file(fn.filename, true) AS this\n"
255 " SELECT parent.path || parent.filename || '/' AS path,\n"
256 " fn, this.size, this.isdir\n"
257 " FROM files AS parent,\n"
258 " pg_ls_dir(parent.path || parent.filename, true, false) AS fn,\n"
259 " pg_stat_file(parent.path || parent.filename || '/' || fn, true) AS this\n"
260 " WHERE parent.isdir = 't'\n"
262 "SELECT path || filename, size, isdir,\n"
263 " pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
265 "LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
266 " AND oid::text = files.filename\n";
270 pg_fatal(
"could not fetch file list: %s",
275 pg_fatal(
"unexpected result set while fetching file list");
428 const char *params[3];
479 pg_fatal(
"could not set libpq connection to single row mode");
509 pg_fatal(
"unexpected result while fetching remote files: %s",
514 pg_fatal(
"received more data chunks than requested");
518 pg_fatal(
"unexpected result set size while fetching remote files");
524 pg_fatal(
"unexpected data types in result set while fetching remote files: %u %u %u",
532 pg_fatal(
"unexpected result format while fetching remote files");
538 pg_fatal(
"unexpected null values in result while fetching remote files");
542 pg_fatal(
"unexpected result length while fetching remote files");
564 pg_log_debug(
"received null value for chunk for file \"%s\", file has been deleted",
570 pg_log_debug(
"received chunk for file \"%s\", offset %lld, size %d",
571 filename, (
long long int) chunkoff, chunksize);
575 pg_fatal(
"received data for file \"%s\", when requested for \"%s\"",
578 if (chunkoff != rq->
offset)
579 pg_fatal(
"received data at offset %lld of file \"%s\", when requested for offset %lld",
580 (
long long int) chunkoff, rq->
path, (
long long int) rq->
offset);
590 if (chunksize > rq->
length)
591 pg_fatal(
"received more than requested for file \"%s\"", rq->
path);
604 pg_fatal(
"unexpected number of data chunks received");
620 if (ch ==
'"' || ch ==
'\\')
640 const char *paramValues[1];
642 paramValues[0] = path;
644 1, NULL, paramValues, NULL, NULL, 1);
647 pg_fatal(
"could not fetch remote file \"%s\": %s",
652 pg_fatal(
"unexpected result set while fetching remote file \"%s\"",
#define ALWAYS_SECURE_SEARCH_PATH_SQL
static void PGresult * res
char * PQerrorMessage(const PGconn *conn)
int PQgetlength(const PGresult *res, int tup_num, int field_num)
int PQsetSingleRowMode(PGconn *conn)
Oid PQftype(const PGresult *res, int field_num)
PGresult * PQexecParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
PGresult * PQgetResult(PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
int PQfformat(const PGresult *res, int field_num)
int PQntuples(const PGresult *res)
PGresult * PQprepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes)
char * PQresultErrorMessage(const PGresult *res)
int PQgetisnull(const PGresult *res, int tup_num, int field_num)
PGresult * PQexec(PGconn *conn, const char *query)
int PQsendQueryPrepared(PGconn *conn, const char *stmtName, int nParams, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
int PQnfields(const PGresult *res)
void * pg_malloc(size_t size)
char * pg_strdup(const char *in)
void * pg_malloc0(size_t size)
void remove_target_file(const char *path, bool missing_ok)
void open_target_file(const char *path, bool trunc)
void write_target_range(char *buf, off_t begin, size_t size)
void(* process_file_callback_t)(const char *path, file_type_t type, size_t size, const char *link_target)
static void init_libpq_conn(PGconn *conn)
static char * libpq_fetch_file(rewind_source *source, const char *path, size_t *filesize)
static void appendArrayEscapedString(StringInfo buf, const char *str)
static void run_simple_command(PGconn *conn, const char *sql)
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source)
rewind_source * init_libpq_source(PGconn *conn)
static void libpq_traverse_files(rewind_source *source, process_file_callback_t callback)
static void libpq_destroy(rewind_source *source)
static void libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len)
#define MAX_CHUNKS_PER_QUERY
static char * run_simple_query(PGconn *conn, const char *sql)
static void libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
static void libpq_finish_fetch(rewind_source *source)
static void process_queued_fetch_requests(libpq_source *src)
#define pg_log_debug(...)
void pfree(void *pointer)
static rewind_source * source
#define is_absolute_path(filename)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
#define appendStringInfoCharMacro(str, ch)
fetch_range_request request_queue[MAX_CHUNKS_PER_QUERY]
void(* queue_fetch_file)(struct rewind_source *, const char *path, size_t len)
void(* traverse_files)(struct rewind_source *, process_file_callback_t callback)
void(* finish_fetch)(struct rewind_source *)
XLogRecPtr(* get_current_wal_insert_lsn)(struct rewind_source *)
void(* queue_fetch_range)(struct rewind_source *, const char *path, off_t offset, size_t len)
char *(* fetch_file)(struct rewind_source *, const char *path, size_t *filesize)
void(* destroy)(struct rewind_source *)
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)