PostgreSQL Source Code  git master
bbstreamer_gzip.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bbstreamer_gzip.c
4  *
5  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/bbstreamer_gzip.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 
16 #ifdef HAVE_LIBZ
17 #include <zlib.h>
18 #endif
19 
20 #include "bbstreamer.h"
21 #include "common/logging.h"
22 #include "common/file_perm.h"
23 #include "common/string.h"
24 
25 #ifdef HAVE_LIBZ
26 typedef struct bbstreamer_gzip_writer
27 {
28  bbstreamer base;
29  char *pathname;
30  gzFile gzfile;
31 } bbstreamer_gzip_writer;
32 
33 typedef struct bbstreamer_gzip_decompressor
34 {
35  bbstreamer base;
36  z_stream zstream;
37  size_t bytes_written;
38 } bbstreamer_gzip_decompressor;
39 
40 static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
41  bbstreamer_member *member,
42  const char *data, int len,
44 static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
45 static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
46 static const char *get_gz_error(gzFile gzf);
47 
48 const bbstreamer_ops bbstreamer_gzip_writer_ops = {
49  .content = bbstreamer_gzip_writer_content,
50  .finalize = bbstreamer_gzip_writer_finalize,
51  .free = bbstreamer_gzip_writer_free
52 };
53 
54 static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
55  bbstreamer_member *member,
56  const char *data, int len,
58 static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer);
59 static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer);
60 static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
61 static void gzip_pfree(void *opaque, void *address);
62 
63 const bbstreamer_ops bbstreamer_gzip_decompressor_ops = {
64  .content = bbstreamer_gzip_decompressor_content,
65  .finalize = bbstreamer_gzip_decompressor_finalize,
66  .free = bbstreamer_gzip_decompressor_free
67 };
68 #endif
69 
70 /*
71  * Create a bbstreamer that just compresses data using gzip, and then writes
72  * it to a file.
73  *
74  * As in the case of bbstreamer_plain_writer_new, pathname is always used
75  * for error reporting purposes; if file is NULL, it is also the opened and
76  * closed so that the data may be written there.
77  */
78 bbstreamer *
79 bbstreamer_gzip_writer_new(char *pathname, FILE *file,
80  pg_compress_specification *compress)
81 {
82 #ifdef HAVE_LIBZ
83  bbstreamer_gzip_writer *streamer;
84 
85  streamer = palloc0(sizeof(bbstreamer_gzip_writer));
86  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
87  &bbstreamer_gzip_writer_ops;
88 
89  streamer->pathname = pstrdup(pathname);
90 
91  if (file == NULL)
92  {
93  streamer->gzfile = gzopen(pathname, "wb");
94  if (streamer->gzfile == NULL)
95  pg_fatal("could not create compressed file \"%s\": %m",
96  pathname);
97  }
98  else
99  {
100  int fd = dup(fileno(file));
101 
102  if (fd < 0)
103  pg_fatal("could not duplicate stdout: %m");
104 
105  streamer->gzfile = gzdopen(fd, "wb");
106  if (streamer->gzfile == NULL)
107  pg_fatal("could not open output file: %m");
108  }
109 
110  if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
111  pg_fatal("could not set compression level %d: %s",
112  compress->level, get_gz_error(streamer->gzfile));
113 
114  return &streamer->base;
115 #else
116  pg_fatal("this build does not support compression with %s", "gzip");
117  return NULL; /* keep compiler quiet */
118 #endif
119 }
120 
121 #ifdef HAVE_LIBZ
122 /*
123  * Write archive content to gzip file.
124  */
125 static void
126 bbstreamer_gzip_writer_content(bbstreamer *streamer,
127  bbstreamer_member *member, const char *data,
128  int len, bbstreamer_archive_context context)
129 {
130  bbstreamer_gzip_writer *mystreamer;
131 
132  mystreamer = (bbstreamer_gzip_writer *) streamer;
133 
134  if (len == 0)
135  return;
136 
137  errno = 0;
138  if (gzwrite(mystreamer->gzfile, data, len) != len)
139  {
140  /* if write didn't set errno, assume problem is no disk space */
141  if (errno == 0)
142  errno = ENOSPC;
143  pg_fatal("could not write to compressed file \"%s\": %s",
144  mystreamer->pathname, get_gz_error(mystreamer->gzfile));
145  }
146 }
147 
148 /*
149  * End-of-archive processing when writing to a gzip file consists of just
150  * calling gzclose.
151  *
152  * It makes no difference whether we opened the file or the caller did it,
153  * because libz provides no way of avoiding a close on the underling file
154  * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
155  * work around this issue, so that the behavior from the caller's viewpoint
156  * is the same as for bbstreamer_plain_writer.
157  */
158 static void
159 bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
160 {
161  bbstreamer_gzip_writer *mystreamer;
162 
163  mystreamer = (bbstreamer_gzip_writer *) streamer;
164 
165  errno = 0; /* in case gzclose() doesn't set it */
166  if (gzclose(mystreamer->gzfile) != 0)
167  pg_fatal("could not close compressed file \"%s\": %m",
168  mystreamer->pathname);
169 
170  mystreamer->gzfile = NULL;
171 }
172 
173 /*
174  * Free memory associated with this bbstreamer.
175  */
176 static void
177 bbstreamer_gzip_writer_free(bbstreamer *streamer)
178 {
179  bbstreamer_gzip_writer *mystreamer;
180 
181  mystreamer = (bbstreamer_gzip_writer *) streamer;
182 
183  Assert(mystreamer->base.bbs_next == NULL);
184  Assert(mystreamer->gzfile == NULL);
185 
186  pfree(mystreamer->pathname);
187  pfree(mystreamer);
188 }
189 
190 /*
191  * Helper function for libz error reporting.
192  */
193 static const char *
194 get_gz_error(gzFile gzf)
195 {
196  int errnum;
197  const char *errmsg;
198 
199  errmsg = gzerror(gzf, &errnum);
200  if (errnum == Z_ERRNO)
201  return strerror(errno);
202  else
203  return errmsg;
204 }
205 #endif
206 
207 /*
208  * Create a new base backup streamer that performs decompression of gzip
209  * compressed blocks.
210  */
211 bbstreamer *
213 {
214 #ifdef HAVE_LIBZ
215  bbstreamer_gzip_decompressor *streamer;
216  z_stream *zs;
217 
218  Assert(next != NULL);
219 
220  streamer = palloc0(sizeof(bbstreamer_gzip_decompressor));
221  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
222  &bbstreamer_gzip_decompressor_ops;
223 
224  streamer->base.bbs_next = next;
225  initStringInfo(&streamer->base.bbs_buffer);
226 
227  /* Initialize internal stream state for decompression */
228  zs = &streamer->zstream;
229  zs->zalloc = gzip_palloc;
230  zs->zfree = gzip_pfree;
231  zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
232  zs->avail_out = streamer->base.bbs_buffer.maxlen;
233 
234  /*
235  * Data compression was initialized using deflateInit2 to request a gzip
236  * header. Similarly, we are using inflateInit2 to initialize data
237  * decompression.
238  *
239  * Per the documentation for inflateInit2, the second argument is
240  * "windowBits" and its value must be greater than or equal to the value
241  * provided while compressing the data, so we are using the maximum
242  * possible value for safety.
243  */
244  if (inflateInit2(zs, 15 + 16) != Z_OK)
245  pg_fatal("could not initialize compression library");
246 
247  return &streamer->base;
248 #else
249  pg_fatal("this build does not support compression with %s", "gzip");
250  return NULL; /* keep compiler quiet */
251 #endif
252 }
253 
254 #ifdef HAVE_LIBZ
255 /*
256  * Decompress the input data to output buffer until we run out of input
257  * data. Each time the output buffer is full, pass on the decompressed data
258  * to the next streamer.
259  */
260 static void
261 bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
262  bbstreamer_member *member,
263  const char *data, int len,
265 {
266  bbstreamer_gzip_decompressor *mystreamer;
267  z_stream *zs;
268 
269  mystreamer = (bbstreamer_gzip_decompressor *) streamer;
270 
271  zs = &mystreamer->zstream;
272  zs->next_in = (uint8 *) data;
273  zs->avail_in = len;
274 
275  /* Process the current chunk */
276  while (zs->avail_in > 0)
277  {
278  int res;
279 
280  Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
281 
282  zs->next_out = (uint8 *)
283  mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
284  zs->avail_out =
285  mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
286 
287  /*
288  * This call decompresses data starting at zs->next_in and updates
289  * zs->next_in * and zs->avail_in. It generates output data starting
290  * at zs->next_out and updates zs->next_out and zs->avail_out
291  * accordingly.
292  */
293  res = inflate(zs, Z_NO_FLUSH);
294 
295  if (res == Z_STREAM_ERROR)
296  pg_log_error("could not decompress data: %s", zs->msg);
297 
298  mystreamer->bytes_written =
299  mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
300 
301  /* If output buffer is full then pass data to next streamer */
302  if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
303  {
304  bbstreamer_content(mystreamer->base.bbs_next, member,
305  mystreamer->base.bbs_buffer.data,
306  mystreamer->base.bbs_buffer.maxlen, context);
307  mystreamer->bytes_written = 0;
308  }
309  }
310 }
311 
312 /*
313  * End-of-stream processing.
314  */
315 static void
316 bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer)
317 {
318  bbstreamer_gzip_decompressor *mystreamer;
319 
320  mystreamer = (bbstreamer_gzip_decompressor *) streamer;
321 
322  /*
323  * End of the stream, if there is some pending data in output buffers then
324  * we must forward it to next streamer.
325  */
326  bbstreamer_content(mystreamer->base.bbs_next, NULL,
327  mystreamer->base.bbs_buffer.data,
328  mystreamer->base.bbs_buffer.maxlen,
330 
331  bbstreamer_finalize(mystreamer->base.bbs_next);
332 }
333 
334 /*
335  * Free memory.
336  */
337 static void
338 bbstreamer_gzip_decompressor_free(bbstreamer *streamer)
339 {
340  bbstreamer_free(streamer->bbs_next);
341  pfree(streamer->bbs_buffer.data);
342  pfree(streamer);
343 }
344 
345 /*
346  * Wrapper function to adjust the signature of palloc to match what libz
347  * expects.
348  */
349 static void *
350 gzip_palloc(void *opaque, unsigned items, unsigned size)
351 {
352  return palloc(items * size);
353 }
354 
355 /*
356  * Wrapper function to adjust the signature of pfree to match what libz
357  * expects.
358  */
359 static void
360 gzip_pfree(void *opaque, void *address)
361 {
362  pfree(address);
363 }
364 #endif
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:126
static void bbstreamer_finalize(bbstreamer *streamer)
Definition: bbstreamer.h:136
bbstreamer_archive_context
Definition: bbstreamer.h:54
@ BBSTREAMER_UNKNOWN
Definition: bbstreamer.h:55
static void bbstreamer_free(bbstreamer *streamer)
Definition: bbstreamer.h:144
bbstreamer * bbstreamer_gzip_writer_new(char *pathname, FILE *file, pg_compress_specification *compress)
bbstreamer * bbstreamer_gzip_decompressor_new(bbstreamer *next)
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:488
int errmsg(const char *fmt,...)
Definition: elog.c:1069
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
char * pstrdup(const char *in)
Definition: mcxt.c:1624
void pfree(void *pointer)
Definition: mcxt.c:1436
void * palloc0(Size size)
Definition: mcxt.c:1241
void * palloc(Size size)
Definition: mcxt.c:1210
#define pg_fatal(...)
const void size_t len
const void * data
#define strerror
Definition: port.h:251
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:117
StringInfoData bbs_buffer
Definition: bbstreamer.h:102
bbstreamer * bbs_next
Definition: bbstreamer.h:101