PostgreSQL Source Code  git master
astreamer_gzip.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * astreamer_gzip.c
4  *
5  * Archive streamers that deal with data compressed using gzip.
6  * astreamer_gzip_writer applies gzip compression to the input data
7  * and writes the result to a file. astreamer_gzip_decompressor assumes
8  * that the input stream is compressed using gzip and decompresses it.
9  *
10  * Note that the code in this file is asymmetric with what we do for
11  * other compression types: for lz4 and zstd, there is a compressor and
12  * a decompressor, rather than a writer and a decompressor. The approach
13  * taken here is less flexible, because a writer can only write to a file,
14  * while a compressor can write to a subsequent astreamer which is free
15  * to do whatever it likes. The reason it's like this is because this
16  * code was adapted from old, less-modular pg_basebackup code that used
17  * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
18  * necessary to change anything at the time.
19  *
20  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
21  *
22  * IDENTIFICATION
23  * src/bin/pg_basebackup/astreamer_gzip.c
24  *-------------------------------------------------------------------------
25  */
26 
27 #include "postgres_fe.h"
28 
29 #include <unistd.h>
30 
31 #ifdef HAVE_LIBZ
32 #include <zlib.h>
33 #endif
34 
35 #include "common/file_perm.h"
36 #include "common/logging.h"
37 #include "common/string.h"
38 #include "fe_utils/astreamer.h"
39 
40 #ifdef HAVE_LIBZ
41 typedef struct astreamer_gzip_writer
42 {
43  astreamer base;
44  char *pathname;
45  gzFile gzfile;
46 } astreamer_gzip_writer;
47 
48 typedef struct astreamer_gzip_decompressor
49 {
50  astreamer base;
51  z_stream zstream;
52  size_t bytes_written;
53 } astreamer_gzip_decompressor;
54 
55 static void astreamer_gzip_writer_content(astreamer *streamer,
56  astreamer_member *member,
57  const char *data, int len,
59 static void astreamer_gzip_writer_finalize(astreamer *streamer);
60 static void astreamer_gzip_writer_free(astreamer *streamer);
61 static const char *get_gz_error(gzFile gzf);
62 
63 static const astreamer_ops astreamer_gzip_writer_ops = {
64  .content = astreamer_gzip_writer_content,
65  .finalize = astreamer_gzip_writer_finalize,
66  .free = astreamer_gzip_writer_free
67 };
68 
69 static void astreamer_gzip_decompressor_content(astreamer *streamer,
70  astreamer_member *member,
71  const char *data, int len,
73 static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
74 static void astreamer_gzip_decompressor_free(astreamer *streamer);
75 static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
76 static void gzip_pfree(void *opaque, void *address);
77 
78 static const astreamer_ops astreamer_gzip_decompressor_ops = {
79  .content = astreamer_gzip_decompressor_content,
80  .finalize = astreamer_gzip_decompressor_finalize,
81  .free = astreamer_gzip_decompressor_free
82 };
83 #endif
84 
85 /*
86  * Create a astreamer that just compresses data using gzip, and then writes
87  * it to a file.
88  *
89  * The caller must specify a pathname and may specify a file. The pathname is
90  * used for error-reporting purposes either way. If file is NULL, the pathname
91  * also identifies the file to which the data should be written: it is opened
92  * for writing and closed when done. If file is not NULL, the data is written
93  * there.
94  *
95  * Note that zlib does not use the FILE interface, but operates directly on
96  * a duplicate of the underlying fd. Hence, callers must take care if they
97  * plan to write any other data to the same FILE, either before or after using
98  * this.
99  */
100 astreamer *
101 astreamer_gzip_writer_new(char *pathname, FILE *file,
102  pg_compress_specification *compress)
103 {
104 #ifdef HAVE_LIBZ
105  astreamer_gzip_writer *streamer;
106 
107  streamer = palloc0(sizeof(astreamer_gzip_writer));
108  *((const astreamer_ops **) &streamer->base.bbs_ops) =
109  &astreamer_gzip_writer_ops;
110 
111  streamer->pathname = pstrdup(pathname);
112 
113  if (file == NULL)
114  {
115  streamer->gzfile = gzopen(pathname, "wb");
116  if (streamer->gzfile == NULL)
117  pg_fatal("could not create compressed file \"%s\": %m",
118  pathname);
119  }
120  else
121  {
122  /*
123  * We must dup the file handle so that gzclose doesn't break the
124  * caller's FILE. See comment for astreamer_gzip_writer_finalize.
125  */
126  int fd = dup(fileno(file));
127 
128  if (fd < 0)
129  pg_fatal("could not duplicate stdout: %m");
130 
131  streamer->gzfile = gzdopen(fd, "wb");
132  if (streamer->gzfile == NULL)
133  pg_fatal("could not open output file: %m");
134  }
135 
136  if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
137  pg_fatal("could not set compression level %d: %s",
138  compress->level, get_gz_error(streamer->gzfile));
139 
140  return &streamer->base;
141 #else
142  pg_fatal("this build does not support compression with %s", "gzip");
143  return NULL; /* keep compiler quiet */
144 #endif
145 }
146 
147 #ifdef HAVE_LIBZ
148 /*
149  * Write archive content to gzip file.
150  */
151 static void
152 astreamer_gzip_writer_content(astreamer *streamer,
153  astreamer_member *member, const char *data,
155 {
156  astreamer_gzip_writer *mystreamer;
157 
158  mystreamer = (astreamer_gzip_writer *) streamer;
159 
160  if (len == 0)
161  return;
162 
163  errno = 0;
164  if (gzwrite(mystreamer->gzfile, data, len) != len)
165  {
166  /* if write didn't set errno, assume problem is no disk space */
167  if (errno == 0)
168  errno = ENOSPC;
169  pg_fatal("could not write to compressed file \"%s\": %s",
170  mystreamer->pathname, get_gz_error(mystreamer->gzfile));
171  }
172 }
173 
174 /*
175  * End-of-archive processing when writing to a gzip file consists of just
176  * calling gzclose.
177  *
178  * It makes no difference whether we opened the file or the caller did it,
179  * because libz provides no way of avoiding a close on the underlying file
180  * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
181  * work around this issue, so that the behavior from the caller's viewpoint
182  * is the same as for astreamer_plain_writer.
183  */
184 static void
185 astreamer_gzip_writer_finalize(astreamer *streamer)
186 {
187  astreamer_gzip_writer *mystreamer;
188 
189  mystreamer = (astreamer_gzip_writer *) streamer;
190 
191  errno = 0; /* in case gzclose() doesn't set it */
192  if (gzclose(mystreamer->gzfile) != 0)
193  pg_fatal("could not close compressed file \"%s\": %m",
194  mystreamer->pathname);
195 
196  mystreamer->gzfile = NULL;
197 }
198 
199 /*
200  * Free memory associated with this astreamer.
201  */
202 static void
203 astreamer_gzip_writer_free(astreamer *streamer)
204 {
205  astreamer_gzip_writer *mystreamer;
206 
207  mystreamer = (astreamer_gzip_writer *) streamer;
208 
209  Assert(mystreamer->base.bbs_next == NULL);
210  Assert(mystreamer->gzfile == NULL);
211 
212  pfree(mystreamer->pathname);
213  pfree(mystreamer);
214 }
215 
216 /*
217  * Helper function for libz error reporting.
218  */
219 static const char *
220 get_gz_error(gzFile gzf)
221 {
222  int errnum;
223  const char *errmsg;
224 
225  errmsg = gzerror(gzf, &errnum);
226  if (errnum == Z_ERRNO)
227  return strerror(errno);
228  else
229  return errmsg;
230 }
231 #endif
232 
233 /*
234  * Create a new base backup streamer that performs decompression of gzip
235  * compressed blocks.
236  */
237 astreamer *
239 {
240 #ifdef HAVE_LIBZ
241  astreamer_gzip_decompressor *streamer;
242  z_stream *zs;
243 
244  Assert(next != NULL);
245 
246  streamer = palloc0(sizeof(astreamer_gzip_decompressor));
247  *((const astreamer_ops **) &streamer->base.bbs_ops) =
248  &astreamer_gzip_decompressor_ops;
249 
250  streamer->base.bbs_next = next;
251  initStringInfo(&streamer->base.bbs_buffer);
252 
253  /* Initialize internal stream state for decompression */
254  zs = &streamer->zstream;
255  zs->zalloc = gzip_palloc;
256  zs->zfree = gzip_pfree;
257  zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
258  zs->avail_out = streamer->base.bbs_buffer.maxlen;
259 
260  /*
261  * Data compression was initialized using deflateInit2 to request a gzip
262  * header. Similarly, we are using inflateInit2 to initialize data
263  * decompression.
264  *
265  * Per the documentation for inflateInit2, the second argument is
266  * "windowBits" and its value must be greater than or equal to the value
267  * provided while compressing the data, so we are using the maximum
268  * possible value for safety.
269  */
270  if (inflateInit2(zs, 15 + 16) != Z_OK)
271  pg_fatal("could not initialize compression library");
272 
273  return &streamer->base;
274 #else
275  pg_fatal("this build does not support compression with %s", "gzip");
276  return NULL; /* keep compiler quiet */
277 #endif
278 }
279 
280 #ifdef HAVE_LIBZ
281 /*
282  * Decompress the input data to output buffer until we run out of input
283  * data. Each time the output buffer is full, pass on the decompressed data
284  * to the next streamer.
285  */
286 static void
287 astreamer_gzip_decompressor_content(astreamer *streamer,
288  astreamer_member *member,
289  const char *data, int len,
291 {
292  astreamer_gzip_decompressor *mystreamer;
293  z_stream *zs;
294 
295  mystreamer = (astreamer_gzip_decompressor *) streamer;
296 
297  zs = &mystreamer->zstream;
298  zs->next_in = (const uint8 *) data;
299  zs->avail_in = len;
300 
301  /* Process the current chunk */
302  while (zs->avail_in > 0)
303  {
304  int res;
305 
306  Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
307 
308  zs->next_out = (uint8 *)
309  mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
310  zs->avail_out =
311  mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
312 
313  /*
314  * This call decompresses data starting at zs->next_in and updates
315  * zs->next_in * and zs->avail_in. It generates output data starting
316  * at zs->next_out and updates zs->next_out and zs->avail_out
317  * accordingly.
318  */
319  res = inflate(zs, Z_NO_FLUSH);
320 
321  if (res == Z_STREAM_ERROR)
322  pg_log_error("could not decompress data: %s", zs->msg);
323 
324  mystreamer->bytes_written =
325  mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
326 
327  /* If output buffer is full then pass data to next streamer */
328  if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
329  {
330  astreamer_content(mystreamer->base.bbs_next, member,
331  mystreamer->base.bbs_buffer.data,
332  mystreamer->base.bbs_buffer.maxlen, context);
333  mystreamer->bytes_written = 0;
334  }
335  }
336 }
337 
338 /*
339  * End-of-stream processing.
340  */
341 static void
342 astreamer_gzip_decompressor_finalize(astreamer *streamer)
343 {
344  astreamer_gzip_decompressor *mystreamer;
345 
346  mystreamer = (astreamer_gzip_decompressor *) streamer;
347 
348  /*
349  * End of the stream, if there is some pending data in output buffers then
350  * we must forward it to next streamer.
351  */
352  astreamer_content(mystreamer->base.bbs_next, NULL,
353  mystreamer->base.bbs_buffer.data,
354  mystreamer->base.bbs_buffer.maxlen,
356 
357  astreamer_finalize(mystreamer->base.bbs_next);
358 }
359 
360 /*
361  * Free memory.
362  */
363 static void
364 astreamer_gzip_decompressor_free(astreamer *streamer)
365 {
366  astreamer_free(streamer->bbs_next);
367  pfree(streamer->bbs_buffer.data);
368  pfree(streamer);
369 }
370 
371 /*
372  * Wrapper function to adjust the signature of palloc to match what libz
373  * expects.
374  */
375 static void *
376 gzip_palloc(void *opaque, unsigned items, unsigned size)
377 {
378  return palloc(items * size);
379 }
380 
381 /*
382  * Wrapper function to adjust the signature of pfree to match what libz
383  * expects.
384  */
385 static void
386 gzip_pfree(void *opaque, void *address)
387 {
388  pfree(address);
389 }
390 #endif
static void astreamer_free(astreamer *streamer)
Definition: astreamer.h:153
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:135
static void astreamer_finalize(astreamer *streamer)
Definition: astreamer.h:145
astreamer_archive_context
Definition: astreamer.h:63
@ ASTREAMER_UNKNOWN
Definition: astreamer.h:64
astreamer * astreamer_gzip_decompressor_new(astreamer *next)
astreamer * astreamer_gzip_writer_new(char *pathname, FILE *file, pg_compress_specification *compress)
static int32 next
Definition: blutils.c:222
#define Assert(condition)
Definition: c.h:849
unsigned char uint8
Definition: c.h:504
int errmsg(const char *fmt,...)
Definition: elog.c:1070
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
#define pg_log_error(...)
Definition: logging.h:106
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
#define pg_fatal(...)
const void size_t len
const void * data
#define strerror
Definition: port.h:251
static int fd(const char *x, int i)
Definition: preproc-init.c:105
tree context
Definition: radixtree.h:1835
static pg_noinline void Size size
Definition: slab.c:607
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:126
StringInfoData bbs_buffer
Definition: astreamer.h:111
astreamer * bbs_next
Definition: astreamer.h:110
static ItemArray items
Definition: test_tidstore.c:49