PostgreSQL Source Code  git master
local_source.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * local_source.c
4  * Functions for using a local data directory as the source.
5  *
6  * Portions Copyright (c) 2013-2024, PostgreSQL Global Development Group
7  *
8  *-------------------------------------------------------------------------
9  */
10 #include "postgres_fe.h"
11 
12 #include <fcntl.h>
13 #include <unistd.h>
14 
15 #include "common/logging.h"
16 #include "file_ops.h"
17 #include "rewind_source.h"
18 
19 typedef struct
20 {
21  rewind_source common; /* common interface functions */
22 
23  const char *datadir; /* path to the source data directory */
24 } local_source;
25 
28 static char *local_fetch_file(rewind_source *source, const char *path,
29  size_t *filesize);
30 static void local_queue_fetch_file(rewind_source *source, const char *path,
31  size_t len);
32 static void local_queue_fetch_range(rewind_source *source, const char *path,
33  off_t off, size_t len);
35 static void local_destroy(rewind_source *source);
36 
39 {
40  local_source *src;
41 
42  src = pg_malloc0(sizeof(local_source));
43 
51 
52  src->datadir = datadir;
53 
54  return &src->common;
55 }
56 
57 static void
59 {
61 }
62 
63 static char *
64 local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
65 {
66  return slurpFile(((local_source *) source)->datadir, path, filesize);
67 }
68 
69 /*
70  * Copy a file from source to target.
71  *
72  * 'len' is the expected length of the file.
73  */
74 static void
75 local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
76 {
77  const char *datadir = ((local_source *) source)->datadir;
79  char srcpath[MAXPGPATH];
80  int srcfd;
81  size_t written_len;
82 
83  snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
84 
85  /* Open source file for reading */
86  srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
87  if (srcfd < 0)
88  pg_fatal("could not open source file \"%s\": %m",
89  srcpath);
90 
91  /* Truncate and open the target file for writing */
92  open_target_file(path, true);
93 
94  written_len = 0;
95  for (;;)
96  {
97  ssize_t read_len;
98 
99  read_len = read(srcfd, buf.data, sizeof(buf));
100 
101  if (read_len < 0)
102  pg_fatal("could not read file \"%s\": %m", srcpath);
103  else if (read_len == 0)
104  break; /* EOF reached */
105 
106  write_target_range(buf.data, written_len, read_len);
107  written_len += read_len;
108  }
109 
110  /*
111  * A local source is not expected to change while we're rewinding, so
112  * check that the size of the file matches our earlier expectation.
113  */
114  if (written_len != len)
115  pg_fatal("size of source file \"%s\" changed concurrently: %d bytes expected, %d copied",
116  srcpath, (int) len, (int) written_len);
117 
118  if (close(srcfd) != 0)
119  pg_fatal("could not close file \"%s\": %m", srcpath);
120 }
121 
122 /*
123  * Copy a file from source to target, starting at 'off', for 'len' bytes.
124  */
125 static void
126 local_queue_fetch_range(rewind_source *source, const char *path, off_t off,
127  size_t len)
128 {
129  const char *datadir = ((local_source *) source)->datadir;
131  char srcpath[MAXPGPATH];
132  int srcfd;
133  off_t begin = off;
134  off_t end = off + len;
135 
136  snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir, path);
137 
138  srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
139  if (srcfd < 0)
140  pg_fatal("could not open source file \"%s\": %m",
141  srcpath);
142 
143  if (lseek(srcfd, begin, SEEK_SET) == -1)
144  pg_fatal("could not seek in source file: %m");
145 
146  open_target_file(path, false);
147 
148  while (end - begin > 0)
149  {
150  ssize_t readlen;
151  size_t thislen;
152 
153  if (end - begin > sizeof(buf))
154  thislen = sizeof(buf);
155  else
156  thislen = end - begin;
157 
158  readlen = read(srcfd, buf.data, thislen);
159 
160  if (readlen < 0)
161  pg_fatal("could not read file \"%s\": %m", srcpath);
162  else if (readlen == 0)
163  pg_fatal("unexpected EOF while reading file \"%s\"", srcpath);
164 
165  write_target_range(buf.data, begin, readlen);
166  begin += readlen;
167  }
168 
169  if (close(srcfd) != 0)
170  pg_fatal("could not close file \"%s\": %m", srcpath);
171 }
172 
173 static void
175 {
176  /*
177  * Nothing to do, local_queue_fetch_range() copies the ranges immediately.
178  */
179 }
180 
181 static void
183 {
184  pfree(source);
185 }
#define PG_BINARY
Definition: c.h:1252
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void traverse_datadir(const char *datadir, process_file_callback_t callback)
Definition: file_ops.c:362
char * slurpFile(const char *datadir, const char *path, size_t *filesize)
Definition: file_ops.c:314
void open_target_file(const char *path, bool trunc)
Definition: file_ops.c:47
void write_target_range(char *buf, off_t begin, size_t size)
Definition: file_ops.c:88
void(* process_file_callback_t)(const char *path, file_type_t type, size_t size, const char *link_target)
Definition: file_ops.h:26
#define close(a)
Definition: win32.h:12
#define read(a, b, c)
Definition: win32.h:13
static void local_queue_fetch_file(rewind_source *source, const char *path, size_t len)
Definition: local_source.c:75
static void local_destroy(rewind_source *source)
Definition: local_source.c:182
rewind_source * init_local_source(const char *datadir)
Definition: local_source.c:38
static void local_traverse_files(rewind_source *source, process_file_callback_t callback)
Definition: local_source.c:58
static char * local_fetch_file(rewind_source *source, const char *path, size_t *filesize)
Definition: local_source.c:64
static void local_finish_fetch(rewind_source *source)
Definition: local_source.c:174
static void local_queue_fetch_range(rewind_source *source, const char *path, off_t off, size_t len)
Definition: local_source.c:126
void pfree(void *pointer)
Definition: mcxt.c:1521
#define pg_fatal(...)
#define MAXPGPATH
const void size_t len
char * datadir
static rewind_source * source
Definition: pg_rewind.c:89
static char * buf
Definition: pg_test_fsync.c:72
#define snprintf
Definition: port.h:238
rewind_source common
Definition: local_source.c:21
const char * datadir
Definition: local_source.c:23
void(* queue_fetch_file)(struct rewind_source *, const char *path, size_t len)
Definition: rewind_source.h:60
void(* traverse_files)(struct rewind_source *, process_file_callback_t callback)
Definition: rewind_source.h:29
void(* finish_fetch)(struct rewind_source *)
Definition: rewind_source.h:66
XLogRecPtr(* get_current_wal_insert_lsn)(struct rewind_source *)
Definition: rewind_source.h:71
void(* queue_fetch_range)(struct rewind_source *, const char *path, off_t offset, size_t len)
Definition: rewind_source.h:47
char *(* fetch_file)(struct rewind_source *, const char *path, size_t *filesize)
Definition: rewind_source.h:37
void(* destroy)(struct rewind_source *)
Definition: rewind_source.h:76
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46