PostgreSQL Source Code git master
astreamer_gzip.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * astreamer_gzip.c
4 *
5 * Archive streamers that deal with data compressed using gzip.
6 * astreamer_gzip_writer applies gzip compression to the input data
7 * and writes the result to a file. astreamer_gzip_decompressor assumes
8 * that the input stream is compressed using gzip and decompresses it.
9 *
10 * Note that the code in this file is asymmetric with what we do for
11 * other compression types: for lz4 and zstd, there is a compressor and
12 * a decompressor, rather than a writer and a decompressor. The approach
13 * taken here is less flexible, because a writer can only write to a file,
14 * while a compressor can write to a subsequent astreamer which is free
15 * to do whatever it likes. The reason it's like this is because this
16 * code was adapted from old, less-modular pg_basebackup code that used
17 * the same APIs that astreamer_gzip_writer now uses, and it didn't seem
18 * necessary to change anything at the time.
19 *
20 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
21 *
22 * IDENTIFICATION
23 * src/fe_utils/astreamer_gzip.c
24 *-------------------------------------------------------------------------
25 */
26
27#include "postgres_fe.h"
28
29#include <unistd.h>
30
31#ifdef HAVE_LIBZ
32#include <zlib.h>
33#endif
34
35#include "common/logging.h"
36#include "fe_utils/astreamer.h"
37
38#ifdef HAVE_LIBZ
39typedef struct astreamer_gzip_writer
40{
41 astreamer base;
42 char *pathname;
43 gzFile gzfile;
44} astreamer_gzip_writer;
45
46typedef struct astreamer_gzip_decompressor
47{
48 astreamer base;
49 z_stream zstream;
50 size_t bytes_written;
51} astreamer_gzip_decompressor;
52
53static void astreamer_gzip_writer_content(astreamer *streamer,
54 astreamer_member *member,
55 const char *data, int len,
57static void astreamer_gzip_writer_finalize(astreamer *streamer);
58static void astreamer_gzip_writer_free(astreamer *streamer);
59static const char *get_gz_error(gzFile gzf);
60
61static const astreamer_ops astreamer_gzip_writer_ops = {
62 .content = astreamer_gzip_writer_content,
63 .finalize = astreamer_gzip_writer_finalize,
64 .free = astreamer_gzip_writer_free
65};
66
67static void astreamer_gzip_decompressor_content(astreamer *streamer,
68 astreamer_member *member,
69 const char *data, int len,
71static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
72static void astreamer_gzip_decompressor_free(astreamer *streamer);
73static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
74static void gzip_pfree(void *opaque, void *address);
75
76static const astreamer_ops astreamer_gzip_decompressor_ops = {
77 .content = astreamer_gzip_decompressor_content,
78 .finalize = astreamer_gzip_decompressor_finalize,
79 .free = astreamer_gzip_decompressor_free
80};
81#endif
82
83/*
84 * Create a astreamer that just compresses data using gzip, and then writes
85 * it to a file.
86 *
87 * The caller must specify a pathname and may specify a file. The pathname is
88 * used for error-reporting purposes either way. If file is NULL, the pathname
89 * also identifies the file to which the data should be written: it is opened
90 * for writing and closed when done. If file is not NULL, the data is written
91 * there.
92 *
93 * Note that zlib does not use the FILE interface, but operates directly on
94 * a duplicate of the underlying fd. Hence, callers must take care if they
95 * plan to write any other data to the same FILE, either before or after using
96 * this.
97 */
99astreamer_gzip_writer_new(char *pathname, FILE *file,
101{
102#ifdef HAVE_LIBZ
103 astreamer_gzip_writer *streamer;
104
105 streamer = palloc0(sizeof(astreamer_gzip_writer));
106 *((const astreamer_ops **) &streamer->base.bbs_ops) =
107 &astreamer_gzip_writer_ops;
108
109 streamer->pathname = pstrdup(pathname);
110
111 if (file == NULL)
112 {
113 streamer->gzfile = gzopen(pathname, "wb");
114 if (streamer->gzfile == NULL)
115 pg_fatal("could not create compressed file \"%s\": %m",
116 pathname);
117 }
118 else
119 {
120 /*
121 * We must dup the file handle so that gzclose doesn't break the
122 * caller's FILE. See comment for astreamer_gzip_writer_finalize.
123 */
124 int fd = dup(fileno(file));
125
126 if (fd < 0)
127 pg_fatal("could not duplicate stdout: %m");
128
129 streamer->gzfile = gzdopen(fd, "wb");
130 if (streamer->gzfile == NULL)
131 pg_fatal("could not open output file: %m");
132 }
133
134 if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
135 pg_fatal("could not set compression level %d: %s",
136 compress->level, get_gz_error(streamer->gzfile));
137
138 return &streamer->base;
139#else
140 pg_fatal("this build does not support compression with %s", "gzip");
141 return NULL; /* keep compiler quiet */
142#endif
143}
144
145#ifdef HAVE_LIBZ
146/*
147 * Write archive content to gzip file.
148 */
149static void
150astreamer_gzip_writer_content(astreamer *streamer,
151 astreamer_member *member, const char *data,
152 int len, astreamer_archive_context context)
153{
154 astreamer_gzip_writer *mystreamer;
155
156 mystreamer = (astreamer_gzip_writer *) streamer;
157
158 if (len == 0)
159 return;
160
161 errno = 0;
162 if (gzwrite(mystreamer->gzfile, data, len) != len)
163 {
164 /* if write didn't set errno, assume problem is no disk space */
165 if (errno == 0)
166 errno = ENOSPC;
167 pg_fatal("could not write to compressed file \"%s\": %s",
168 mystreamer->pathname, get_gz_error(mystreamer->gzfile));
169 }
170}
171
172/*
173 * End-of-archive processing when writing to a gzip file consists of just
174 * calling gzclose.
175 *
176 * It makes no difference whether we opened the file or the caller did it,
177 * because libz provides no way of avoiding a close on the underlying file
178 * handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
179 * work around this issue, so that the behavior from the caller's viewpoint
180 * is the same as for astreamer_plain_writer.
181 */
182static void
183astreamer_gzip_writer_finalize(astreamer *streamer)
184{
185 astreamer_gzip_writer *mystreamer;
186
187 mystreamer = (astreamer_gzip_writer *) streamer;
188
189 errno = 0; /* in case gzclose() doesn't set it */
190 if (gzclose(mystreamer->gzfile) != 0)
191 pg_fatal("could not close compressed file \"%s\": %m",
192 mystreamer->pathname);
193
194 mystreamer->gzfile = NULL;
195}
196
197/*
198 * Free memory associated with this astreamer.
199 */
200static void
201astreamer_gzip_writer_free(astreamer *streamer)
202{
203 astreamer_gzip_writer *mystreamer;
204
205 mystreamer = (astreamer_gzip_writer *) streamer;
206
207 Assert(mystreamer->base.bbs_next == NULL);
208 Assert(mystreamer->gzfile == NULL);
209
210 pfree(mystreamer->pathname);
211 pfree(mystreamer);
212}
213
214/*
215 * Helper function for libz error reporting.
216 */
217static const char *
218get_gz_error(gzFile gzf)
219{
220 int errnum;
221 const char *errmsg;
222
223 errmsg = gzerror(gzf, &errnum);
224 if (errnum == Z_ERRNO)
225 return strerror(errno);
226 else
227 return errmsg;
228}
229#endif
230
231/*
232 * Create a new base backup streamer that performs decompression of gzip
233 * compressed blocks.
234 */
235astreamer *
237{
238#ifdef HAVE_LIBZ
239 astreamer_gzip_decompressor *streamer;
240 z_stream *zs;
241
242 Assert(next != NULL);
243
244 streamer = palloc0(sizeof(astreamer_gzip_decompressor));
245 *((const astreamer_ops **) &streamer->base.bbs_ops) =
246 &astreamer_gzip_decompressor_ops;
247
248 streamer->base.bbs_next = next;
249 initStringInfo(&streamer->base.bbs_buffer);
250
251 /* Initialize internal stream state for decompression */
252 zs = &streamer->zstream;
253 zs->zalloc = gzip_palloc;
254 zs->zfree = gzip_pfree;
255 zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
256 zs->avail_out = streamer->base.bbs_buffer.maxlen;
257
258 /*
259 * Data compression was initialized using deflateInit2 to request a gzip
260 * header. Similarly, we are using inflateInit2 to initialize data
261 * decompression.
262 *
263 * Per the documentation for inflateInit2, the second argument is
264 * "windowBits" and its value must be greater than or equal to the value
265 * provided while compressing the data, so we are using the maximum
266 * possible value for safety.
267 */
268 if (inflateInit2(zs, 15 + 16) != Z_OK)
269 pg_fatal("could not initialize compression library");
270
271 return &streamer->base;
272#else
273 pg_fatal("this build does not support compression with %s", "gzip");
274 return NULL; /* keep compiler quiet */
275#endif
276}
277
278#ifdef HAVE_LIBZ
279/*
280 * Decompress the input data to output buffer until we run out of input
281 * data. Each time the output buffer is full, pass on the decompressed data
282 * to the next streamer.
283 */
284static void
285astreamer_gzip_decompressor_content(astreamer *streamer,
286 astreamer_member *member,
287 const char *data, int len,
289{
290 astreamer_gzip_decompressor *mystreamer;
291 z_stream *zs;
292
293 mystreamer = (astreamer_gzip_decompressor *) streamer;
294
295 zs = &mystreamer->zstream;
296 zs->next_in = (const uint8 *) data;
297 zs->avail_in = len;
298
299 /* Process the current chunk */
300 while (zs->avail_in > 0)
301 {
302 int res;
303
304 Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
305
306 zs->next_out = (uint8 *)
307 mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
308 zs->avail_out =
309 mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
310
311 /*
312 * This call decompresses data starting at zs->next_in and updates
313 * zs->next_in * and zs->avail_in. It generates output data starting
314 * at zs->next_out and updates zs->next_out and zs->avail_out
315 * accordingly.
316 */
317 res = inflate(zs, Z_NO_FLUSH);
318
319 if (res == Z_STREAM_ERROR)
320 pg_log_error("could not decompress data: %s", zs->msg);
321
322 mystreamer->bytes_written =
323 mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
324
325 /* If output buffer is full then pass data to next streamer */
326 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
327 {
328 astreamer_content(mystreamer->base.bbs_next, member,
329 mystreamer->base.bbs_buffer.data,
330 mystreamer->base.bbs_buffer.maxlen, context);
331 mystreamer->bytes_written = 0;
332 }
333 }
334}
335
336/*
337 * End-of-stream processing.
338 */
339static void
340astreamer_gzip_decompressor_finalize(astreamer *streamer)
341{
342 astreamer_gzip_decompressor *mystreamer;
343
344 mystreamer = (astreamer_gzip_decompressor *) streamer;
345
346 /*
347 * End of the stream, if there is some pending data in output buffers then
348 * we must forward it to next streamer.
349 */
350 astreamer_content(mystreamer->base.bbs_next, NULL,
351 mystreamer->base.bbs_buffer.data,
352 mystreamer->base.bbs_buffer.maxlen,
354
355 astreamer_finalize(mystreamer->base.bbs_next);
356}
357
358/*
359 * Free memory.
360 */
361static void
362astreamer_gzip_decompressor_free(astreamer *streamer)
363{
364 astreamer_free(streamer->bbs_next);
365 pfree(streamer->bbs_buffer.data);
366 pfree(streamer);
367}
368
369/*
370 * Wrapper function to adjust the signature of palloc to match what libz
371 * expects.
372 */
373static void *
374gzip_palloc(void *opaque, unsigned items, unsigned size)
375{
376 return palloc(items * size);
377}
378
379/*
380 * Wrapper function to adjust the signature of pfree to match what libz
381 * expects.
382 */
383static void
384gzip_pfree(void *opaque, void *address)
385{
386 pfree(address);
387}
388#endif
static void astreamer_free(astreamer *streamer)
Definition: astreamer.h:153
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:135
static void astreamer_finalize(astreamer *streamer)
Definition: astreamer.h:145
astreamer_archive_context
Definition: astreamer.h:63
@ ASTREAMER_UNKNOWN
Definition: astreamer.h:64
astreamer * astreamer_gzip_decompressor_new(astreamer *next)
astreamer * astreamer_gzip_writer_new(char *pathname, FILE *file, pg_compress_specification *compress)
static int32 next
Definition: blutils.c:224
uint8_t uint8
Definition: c.h:500
int errmsg(const char *fmt,...)
Definition: elog.c:1070
Assert(PointerIsAligned(start, uint64))
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:78
#define pg_log_error(...)
Definition: logging.h:106
char * pstrdup(const char *in)
Definition: mcxt.c:1699
void pfree(void *pointer)
Definition: mcxt.c:1524
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
#define pg_fatal(...)
const void size_t len
const void * data
#define strerror
Definition: port.h:252
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:126
StringInfoData bbs_buffer
Definition: astreamer.h:111
astreamer * bbs_next
Definition: astreamer.h:110
static ItemArray items
Definition: test_tidstore.c:48