PostgreSQL Source Code  git master
bbstreamer_gzip.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bbstreamer_gzip.c
4  *
5  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/bbstreamer_gzip.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 
16 #ifdef HAVE_LIBZ
17 #include <zlib.h>
18 #endif
19 
20 #include "bbstreamer.h"
21 #include "common/logging.h"
22 #include "common/file_perm.h"
23 #include "common/string.h"
24 
25 #ifdef HAVE_LIBZ
26 typedef struct bbstreamer_gzip_writer
27 {
28  bbstreamer base;
29  char *pathname;
30  gzFile gzfile;
31 } bbstreamer_gzip_writer;
32 
33 typedef struct bbstreamer_gzip_decompressor
34 {
35  bbstreamer base;
36  z_stream zstream;
37  size_t bytes_written;
38 } bbstreamer_gzip_decompressor;
39 
40 static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
41  bbstreamer_member *member,
42  const char *data, int len,
44 static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
45 static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
46 static const char *get_gz_error(gzFile gzf);
47 
48 const bbstreamer_ops bbstreamer_gzip_writer_ops = {
49  .content = bbstreamer_gzip_writer_content,
50  .finalize = bbstreamer_gzip_writer_finalize,
51  .free = bbstreamer_gzip_writer_free
52 };
53 
54 static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
55  bbstreamer_member *member,
56  const char *data, int len,
58 static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer);
59 static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer);
60 static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
61 static void gzip_pfree(void *opaque, void *address);
62 
63 const bbstreamer_ops bbstreamer_gzip_decompressor_ops = {
64  .content = bbstreamer_gzip_decompressor_content,
65  .finalize = bbstreamer_gzip_decompressor_finalize,
66  .free = bbstreamer_gzip_decompressor_free
67 };
68 #endif
69 
70 /*
71  * Create a bbstreamer that just compresses data using gzip, and then writes
72  * it to a file.
73  *
74  * As in the case of bbstreamer_plain_writer_new, pathname is always used
75  * for error reporting purposes; if file is NULL, it is also the opened and
76  * closed so that the data may be written there.
77  */
78 bbstreamer *
79 bbstreamer_gzip_writer_new(char *pathname, FILE *file,
80  pg_compress_specification *compress)
81 {
82 #ifdef HAVE_LIBZ
83  bbstreamer_gzip_writer *streamer;
84 
85  streamer = palloc0(sizeof(bbstreamer_gzip_writer));
86  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
87  &bbstreamer_gzip_writer_ops;
88 
89  streamer->pathname = pstrdup(pathname);
90 
91  if (file == NULL)
92  {
93  streamer->gzfile = gzopen(pathname, "wb");
94  if (streamer->gzfile == NULL)
95  pg_fatal("could not create compressed file \"%s\": %m",
96  pathname);
97  }
98  else
99  {
100  int fd = dup(fileno(file));
101 
102  if (fd < 0)
103  pg_fatal("could not duplicate stdout: %m");
104 
105  streamer->gzfile = gzdopen(fd, "wb");
106  if (streamer->gzfile == NULL)
107  pg_fatal("could not open output file: %m");
108  }
109 
110  if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) != 0 &&
111  gzsetparams(streamer->gzfile, compress->level,
112  Z_DEFAULT_STRATEGY) != Z_OK)
113  pg_fatal("could not set compression level %d: %s",
114  compress->level, get_gz_error(streamer->gzfile));
115 
116  return &streamer->base;
117 #else
118  pg_fatal("this build does not support gzip compression");
119  return NULL; /* keep compiler quiet */
120 #endif
121 }
122 
123 #ifdef HAVE_LIBZ
124 /*
125  * Write archive content to gzip file.
126  */
127 static void
128 bbstreamer_gzip_writer_content(bbstreamer *streamer,
129  bbstreamer_member *member, const char *data,
130  int len, bbstreamer_archive_context context)
131 {
132  bbstreamer_gzip_writer *mystreamer;
133 
134  mystreamer = (bbstreamer_gzip_writer *) streamer;
135 
136  if (len == 0)
137  return;
138 
139  errno = 0;
140  if (gzwrite(mystreamer->gzfile, data, len) != len)
141  {
142  /* if write didn't set errno, assume problem is no disk space */
143  if (errno == 0)
144  errno = ENOSPC;
145  pg_fatal("could not write to compressed file \"%s\": %s",
146  mystreamer->pathname, get_gz_error(mystreamer->gzfile));
147  }
148 }
149 
150 /*
151  * End-of-archive processing when writing to a gzip file consists of just
152  * calling gzclose.
153  *
154  * It makes no difference whether we opened the file or the caller did it,
155  * because libz provides no way of avoiding a close on the underling file
156  * handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
157  * work around this issue, so that the behavior from the caller's viewpoint
158  * is the same as for bbstreamer_plain_writer.
159  */
160 static void
161 bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
162 {
163  bbstreamer_gzip_writer *mystreamer;
164 
165  mystreamer = (bbstreamer_gzip_writer *) streamer;
166 
167  errno = 0; /* in case gzclose() doesn't set it */
168  if (gzclose(mystreamer->gzfile) != 0)
169  pg_fatal("could not close compressed file \"%s\": %m",
170  mystreamer->pathname);
171 
172  mystreamer->gzfile = NULL;
173 }
174 
175 /*
176  * Free memory associated with this bbstreamer.
177  */
178 static void
179 bbstreamer_gzip_writer_free(bbstreamer *streamer)
180 {
181  bbstreamer_gzip_writer *mystreamer;
182 
183  mystreamer = (bbstreamer_gzip_writer *) streamer;
184 
185  Assert(mystreamer->base.bbs_next == NULL);
186  Assert(mystreamer->gzfile == NULL);
187 
188  pfree(mystreamer->pathname);
189  pfree(mystreamer);
190 }
191 
192 /*
193  * Helper function for libz error reporting.
194  */
195 static const char *
196 get_gz_error(gzFile gzf)
197 {
198  int errnum;
199  const char *errmsg;
200 
201  errmsg = gzerror(gzf, &errnum);
202  if (errnum == Z_ERRNO)
203  return strerror(errno);
204  else
205  return errmsg;
206 }
207 #endif
208 
209 /*
210  * Create a new base backup streamer that performs decompression of gzip
211  * compressed blocks.
212  */
213 bbstreamer *
215 {
216 #ifdef HAVE_LIBZ
217  bbstreamer_gzip_decompressor *streamer;
218  z_stream *zs;
219 
220  Assert(next != NULL);
221 
222  streamer = palloc0(sizeof(bbstreamer_gzip_decompressor));
223  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
224  &bbstreamer_gzip_decompressor_ops;
225 
226  streamer->base.bbs_next = next;
227  initStringInfo(&streamer->base.bbs_buffer);
228 
229  /* Initialize internal stream state for decompression */
230  zs = &streamer->zstream;
231  zs->zalloc = gzip_palloc;
232  zs->zfree = gzip_pfree;
233  zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
234  zs->avail_out = streamer->base.bbs_buffer.maxlen;
235 
236  /*
237  * Data compression was initialized using deflateInit2 to request a gzip
238  * header. Similarly, we are using inflateInit2 to initialize data
239  * decompression.
240  *
241  * Per the documentation for inflateInit2, the second argument is
242  * "windowBits" and its value must be greater than or equal to the value
243  * provided while compressing the data, so we are using the maximum
244  * possible value for safety.
245  */
246  if (inflateInit2(zs, 15 + 16) != Z_OK)
247  pg_fatal("could not initialize compression library");
248 
249  return &streamer->base;
250 #else
251  pg_fatal("this build does not support gzip compression");
252  return NULL; /* keep compiler quiet */
253 #endif
254 }
255 
256 #ifdef HAVE_LIBZ
257 /*
258  * Decompress the input data to output buffer until we run out of input
259  * data. Each time the output buffer is full, pass on the decompressed data
260  * to the next streamer.
261  */
262 static void
263 bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
264  bbstreamer_member *member,
265  const char *data, int len,
267 {
268  bbstreamer_gzip_decompressor *mystreamer;
269  z_stream *zs;
270 
271  mystreamer = (bbstreamer_gzip_decompressor *) streamer;
272 
273  zs = &mystreamer->zstream;
274  zs->next_in = (uint8 *) data;
275  zs->avail_in = len;
276 
277  /* Process the current chunk */
278  while (zs->avail_in > 0)
279  {
280  int res;
281 
282  Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
283 
284  zs->next_out = (uint8 *)
285  mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
286  zs->avail_out =
287  mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
288 
289  /*
290  * This call decompresses data starting at zs->next_in and updates
291  * zs->next_in * and zs->avail_in. It generates output data starting
292  * at zs->next_out and updates zs->next_out and zs->avail_out
293  * accordingly.
294  */
295  res = inflate(zs, Z_NO_FLUSH);
296 
297  if (res == Z_STREAM_ERROR)
298  pg_log_error("could not decompress data: %s", zs->msg);
299 
300  mystreamer->bytes_written =
301  mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
302 
303  /* If output buffer is full then pass data to next streamer */
304  if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
305  {
306  bbstreamer_content(mystreamer->base.bbs_next, member,
307  mystreamer->base.bbs_buffer.data,
308  mystreamer->base.bbs_buffer.maxlen, context);
309  mystreamer->bytes_written = 0;
310  }
311  }
312 }
313 
314 /*
315  * End-of-stream processing.
316  */
317 static void
318 bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer)
319 {
320  bbstreamer_gzip_decompressor *mystreamer;
321 
322  mystreamer = (bbstreamer_gzip_decompressor *) streamer;
323 
324  /*
325  * End of the stream, if there is some pending data in output buffers then
326  * we must forward it to next streamer.
327  */
328  bbstreamer_content(mystreamer->base.bbs_next, NULL,
329  mystreamer->base.bbs_buffer.data,
330  mystreamer->base.bbs_buffer.maxlen,
332 
333  bbstreamer_finalize(mystreamer->base.bbs_next);
334 }
335 
336 /*
337  * Free memory.
338  */
339 static void
340 bbstreamer_gzip_decompressor_free(bbstreamer *streamer)
341 {
342  bbstreamer_free(streamer->bbs_next);
343  pfree(streamer->bbs_buffer.data);
344  pfree(streamer);
345 }
346 
347 /*
348  * Wrapper function to adjust the signature of palloc to match what libz
349  * expects.
350  */
351 static void *
352 gzip_palloc(void *opaque, unsigned items, unsigned size)
353 {
354  return palloc(items * size);
355 }
356 
357 /*
358  * Wrapper function to adjust the signature of pfree to match what libz
359  * expects.
360  */
361 static void
362 gzip_pfree(void *opaque, void *address)
363 {
364  pfree(address);
365 }
366 #endif
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:126
static void bbstreamer_finalize(bbstreamer *streamer)
Definition: bbstreamer.h:136
bbstreamer_archive_context
Definition: bbstreamer.h:54
@ BBSTREAMER_UNKNOWN
Definition: bbstreamer.h:55
static void bbstreamer_free(bbstreamer *streamer)
Definition: bbstreamer.h:144
bbstreamer * bbstreamer_gzip_writer_new(char *pathname, FILE *file, pg_compress_specification *compress)
bbstreamer * bbstreamer_gzip_decompressor_new(bbstreamer *next)
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:439
#define PG_COMPRESSION_OPTION_LEVEL
Definition: compression.h:25
int errmsg(const char *fmt,...)
Definition: elog.c:904
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
char * pstrdup(const char *in)
Definition: mcxt.c:1305
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
void * palloc(Size size)
Definition: mcxt.c:1068
#define pg_fatal(...)
const void size_t len
const void * data
#define strerror
Definition: port.h:238
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:117
StringInfoData bbs_buffer
Definition: bbstreamer.h:102
bbstreamer * bbs_next
Definition: bbstreamer.h:101