PostgreSQL Source Code  git master
compress_io.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * compress_io.c
4  * Routines for archivers to write an uncompressed or compressed data
5  * stream.
6  *
7  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * This file includes two APIs for dealing with compressed data. The first
11  * provides more flexibility, using callbacks to read/write data from the
12  * underlying stream. The second API is a wrapper around fopen/gzopen and
13  * friends, providing an interface similar to those, but abstracts away
14  * the possible compression. Both APIs use libz for the compression, but
15  * the second API uses gzip headers, so the resulting files can be easily
16  * manipulated with the gzip utility.
17  *
18  * Compressor API
19  * --------------
20  *
21  * The interface for writing to an archive consists of three functions:
22  * AllocateCompressor, WriteDataToArchive and EndCompressor. First you call
23  * AllocateCompressor, then write all the data by calling WriteDataToArchive
24  * as many times as needed, and finally EndCompressor. WriteDataToArchive
25  * and EndCompressor will call the WriteFunc that was provided to
26  * AllocateCompressor for each chunk of compressed data.
27  *
28  * The interface for reading an archive consists of just one function:
29  * ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input
30  * stream, by repeatedly calling the given ReadFunc. ReadFunc returns the
31  * compressed data chunk at a time, and ReadDataFromArchive decompresses it
32  * and passes the decompressed data to ahwrite(), until ReadFunc returns 0
33  * to signal EOF.
34  *
35  * The interface is the same for compressed and uncompressed streams.
36  *
37  * Compressed stream API
38  * ----------------------
39  *
40  * The compressed stream API is a wrapper around the C standard fopen() and
41  * libz's gzopen() APIs. It allows you to use the same functions for
42  * compressed and uncompressed streams. cfopen_read() first tries to open
43  * the file with given name, and if it fails, it tries to open the same
44  * file with the .gz suffix. cfopen_write() opens a file for writing, an
45  * extra argument specifies if the file should be compressed, and adds the
46  * .gz suffix to the filename if so. This allows you to easily handle both
47  * compressed and uncompressed files.
48  *
49  * IDENTIFICATION
50  * src/bin/pg_dump/compress_io.c
51  *
52  *-------------------------------------------------------------------------
53  */
54 #include "postgres_fe.h"
55 
56 #include "compress_io.h"
57 #include "pg_backup_utils.h"
58 
59 /*----------------------
60  * Compressor API
61  *----------------------
62  */
63 
64 /* typedef appears in compress_io.h */
66 {
69 
70 #ifdef HAVE_LIBZ
71  z_streamp zp;
72  char *zlibOut;
73  size_t zlibOutSize;
74 #endif
75 };
76 
77 /* Routines that support zlib compressed data I/O */
78 #ifdef HAVE_LIBZ
79 static void InitCompressorZlib(CompressorState *cs, int level);
80 static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
81  bool flush);
82 static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
83 static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
84  const char *data, size_t dLen);
85 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
86 #endif
87 
88 /* Routines that support uncompressed data I/O */
89 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
91  const char *data, size_t dLen);
92 
93 /* Public interface routines */
94 
95 /* Allocate a new compressor */
98  WriteFunc writeF)
99 {
100  CompressorState *cs;
101 
102 #ifndef HAVE_LIBZ
103  if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
104  pg_fatal("not built with zlib support");
105 #endif
106 
107  cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
108  cs->writeF = writeF;
109  cs->compression_spec = compression_spec;
110 
111  /*
112  * Perform compression algorithm specific initialization.
113  */
114 #ifdef HAVE_LIBZ
116  InitCompressorZlib(cs, cs->compression_spec.level);
117 #endif
118 
119  return cs;
120 }
121 
122 /*
123  * Read all compressed data from the input stream (via readF) and print it
124  * out with ahwrite().
125  */
126 void
128  const pg_compress_specification compression_spec,
129  ReadFunc readF)
130 {
131  if (compression_spec.algorithm == PG_COMPRESSION_NONE)
132  ReadDataFromArchiveNone(AH, readF);
133  if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
134  {
135 #ifdef HAVE_LIBZ
136  ReadDataFromArchiveZlib(AH, readF);
137 #else
138  pg_fatal("not built with zlib support");
139 #endif
140  }
141 }
142 
143 /*
144  * Compress and write data to the output stream (via writeF).
145  */
146 void
148  const void *data, size_t dLen)
149 {
150  switch (cs->compression_spec.algorithm)
151  {
152  case PG_COMPRESSION_GZIP:
153 #ifdef HAVE_LIBZ
154  WriteDataToArchiveZlib(AH, cs, data, dLen);
155 #else
156  pg_fatal("not built with zlib support");
157 #endif
158  break;
159  case PG_COMPRESSION_NONE:
160  WriteDataToArchiveNone(AH, cs, data, dLen);
161  break;
162  case PG_COMPRESSION_LZ4:
163  /* fallthrough */
164  case PG_COMPRESSION_ZSTD:
165  pg_fatal("invalid compression method");
166  break;
167  }
168 }
169 
170 /*
171  * Terminate compression library context and flush its buffers.
172  */
173 void
175 {
176 #ifdef HAVE_LIBZ
178  EndCompressorZlib(AH, cs);
179 #endif
180  free(cs);
181 }
182 
183 /* Private routines, specific to each compression method. */
184 
185 #ifdef HAVE_LIBZ
186 /*
187  * Functions for zlib compressed output.
188  */
189 
190 static void
191 InitCompressorZlib(CompressorState *cs, int level)
192 {
193  z_streamp zp;
194 
195  zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
196  zp->zalloc = Z_NULL;
197  zp->zfree = Z_NULL;
198  zp->opaque = Z_NULL;
199 
200  /*
201  * zlibOutSize is the buffer size we tell zlib it can output to. We
202  * actually allocate one extra byte because some routines want to append a
203  * trailing zero byte to the zlib output.
204  */
205  cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
206  cs->zlibOutSize = ZLIB_OUT_SIZE;
207 
208  if (deflateInit(zp, level) != Z_OK)
209  pg_fatal("could not initialize compression library: %s",
210  zp->msg);
211 
212  /* Just be paranoid - maybe End is called after Start, with no Write */
213  zp->next_out = (void *) cs->zlibOut;
214  zp->avail_out = cs->zlibOutSize;
215 }
216 
217 static void
218 EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
219 {
220  z_streamp zp = cs->zp;
221 
222  zp->next_in = NULL;
223  zp->avail_in = 0;
224 
225  /* Flush any remaining data from zlib buffer */
226  DeflateCompressorZlib(AH, cs, true);
227 
228  if (deflateEnd(zp) != Z_OK)
229  pg_fatal("could not close compression stream: %s", zp->msg);
230 
231  free(cs->zlibOut);
232  free(cs->zp);
233 }
234 
235 static void
236 DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
237 {
238  z_streamp zp = cs->zp;
239  char *out = cs->zlibOut;
240  int res = Z_OK;
241 
242  while (cs->zp->avail_in != 0 || flush)
243  {
244  res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
245  if (res == Z_STREAM_ERROR)
246  pg_fatal("could not compress data: %s", zp->msg);
247  if ((flush && (zp->avail_out < cs->zlibOutSize))
248  || (zp->avail_out == 0)
249  || (zp->avail_in != 0)
250  )
251  {
252  /*
253  * Extra paranoia: avoid zero-length chunks, since a zero length
254  * chunk is the EOF marker in the custom format. This should never
255  * happen but...
256  */
257  if (zp->avail_out < cs->zlibOutSize)
258  {
259  /*
260  * Any write function should do its own error checking but to
261  * make sure we do a check here as well...
262  */
263  size_t len = cs->zlibOutSize - zp->avail_out;
264 
265  cs->writeF(AH, out, len);
266  }
267  zp->next_out = (void *) out;
268  zp->avail_out = cs->zlibOutSize;
269  }
270 
271  if (res == Z_STREAM_END)
272  break;
273  }
274 }
275 
276 static void
277 WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
278  const char *data, size_t dLen)
279 {
280  cs->zp->next_in = (void *) unconstify(char *, data);
281  cs->zp->avail_in = dLen;
282  DeflateCompressorZlib(AH, cs, false);
283 }
284 
285 static void
286 ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
287 {
288  z_streamp zp;
289  char *out;
290  int res = Z_OK;
291  size_t cnt;
292  char *buf;
293  size_t buflen;
294 
295  zp = (z_streamp) pg_malloc(sizeof(z_stream));
296  zp->zalloc = Z_NULL;
297  zp->zfree = Z_NULL;
298  zp->opaque = Z_NULL;
299 
301  buflen = ZLIB_IN_SIZE;
302 
303  out = pg_malloc(ZLIB_OUT_SIZE + 1);
304 
305  if (inflateInit(zp) != Z_OK)
306  pg_fatal("could not initialize compression library: %s",
307  zp->msg);
308 
309  /* no minimal chunk size for zlib */
310  while ((cnt = readF(AH, &buf, &buflen)))
311  {
312  zp->next_in = (void *) buf;
313  zp->avail_in = cnt;
314 
315  while (zp->avail_in > 0)
316  {
317  zp->next_out = (void *) out;
318  zp->avail_out = ZLIB_OUT_SIZE;
319 
320  res = inflate(zp, 0);
321  if (res != Z_OK && res != Z_STREAM_END)
322  pg_fatal("could not uncompress data: %s", zp->msg);
323 
324  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
325  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
326  }
327  }
328 
329  zp->next_in = NULL;
330  zp->avail_in = 0;
331  while (res != Z_STREAM_END)
332  {
333  zp->next_out = (void *) out;
334  zp->avail_out = ZLIB_OUT_SIZE;
335  res = inflate(zp, 0);
336  if (res != Z_OK && res != Z_STREAM_END)
337  pg_fatal("could not uncompress data: %s", zp->msg);
338 
339  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
340  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
341  }
342 
343  if (inflateEnd(zp) != Z_OK)
344  pg_fatal("could not close compression library: %s", zp->msg);
345 
346  free(buf);
347  free(out);
348  free(zp);
349 }
350 #endif /* HAVE_LIBZ */
351 
352 
353 /*
354  * Functions for uncompressed output.
355  */
356 
357 static void
359 {
360  size_t cnt;
361  char *buf;
362  size_t buflen;
363 
365  buflen = ZLIB_OUT_SIZE;
366 
367  while ((cnt = readF(AH, &buf, &buflen)))
368  {
369  ahwrite(buf, 1, cnt, AH);
370  }
371 
372  free(buf);
373 }
374 
375 static void
377  const char *data, size_t dLen)
378 {
379  cs->writeF(AH, data, dLen);
380 }
381 
382 
383 /*----------------------
384  * Compressed stream API
385  *----------------------
386  */
387 
388 /*
389  * cfp represents an open stream, wrapping the underlying FILE or gzFile
390  * pointer. This is opaque to the callers.
391  */
392 struct cfp
393 {
395 #ifdef HAVE_LIBZ
396  gzFile compressedfp;
397 #endif
398 };
399 
400 #ifdef HAVE_LIBZ
401 static int hasSuffix(const char *filename, const char *suffix);
402 #endif
403 
404 /* free() without changing errno; useful in several places below */
405 static void
407 {
408  int save_errno = errno;
409 
410  free(p);
411  errno = save_errno;
412 }
413 
414 /*
415  * Open a file for reading. 'path' is the file to open, and 'mode' should
416  * be either "r" or "rb".
417  *
418  * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
419  * doesn't already have it) and try again. So if you pass "foo" as 'path',
420  * this will open either "foo" or "foo.gz".
421  *
422  * On failure, return NULL with an error code in errno.
423  */
424 cfp *
425 cfopen_read(const char *path, const char *mode)
426 {
427  cfp *fp;
428 
429  pg_compress_specification compression_spec = {0};
430 
431 #ifdef HAVE_LIBZ
432  if (hasSuffix(path, ".gz"))
433  {
434  compression_spec.algorithm = PG_COMPRESSION_GZIP;
435  fp = cfopen(path, mode, compression_spec);
436  }
437  else
438 #endif
439  {
440  compression_spec.algorithm = PG_COMPRESSION_NONE;
441  fp = cfopen(path, mode, compression_spec);
442 #ifdef HAVE_LIBZ
443  if (fp == NULL)
444  {
445  char *fname;
446 
447  fname = psprintf("%s.gz", path);
448  compression_spec.algorithm = PG_COMPRESSION_GZIP;
449  fp = cfopen(fname, mode, compression_spec);
450  free_keep_errno(fname);
451  }
452 #endif
453  }
454  return fp;
455 }
456 
457 /*
458  * Open a file for writing. 'path' indicates the path name, and 'mode' must
459  * be a filemode as accepted by fopen() and gzopen() that indicates writing
460  * ("w", "wb", "a", or "ab").
461  *
462  * If 'compression_spec.algorithm' is GZIP, a gzip compressed stream is opened,
463  * and 'compression_spec.level' used. The ".gz" suffix is automatically added to
464  * 'path' in that case.
465  *
466  * On failure, return NULL with an error code in errno.
467  */
468 cfp *
469 cfopen_write(const char *path, const char *mode,
470  const pg_compress_specification compression_spec)
471 {
472  cfp *fp;
473 
474  if (compression_spec.algorithm == PG_COMPRESSION_NONE)
475  fp = cfopen(path, mode, compression_spec);
476  else
477  {
478 #ifdef HAVE_LIBZ
479  char *fname;
480 
481  fname = psprintf("%s.gz", path);
482  fp = cfopen(fname, mode, compression_spec);
483  free_keep_errno(fname);
484 #else
485  pg_fatal("not built with zlib support");
486  fp = NULL; /* keep compiler quiet */
487 #endif
488  }
489  return fp;
490 }
491 
492 /*
493  * Opens file 'path' in 'mode'. If compression is GZIP, the file
494  * is opened with libz gzopen(), otherwise with plain fopen().
495  *
496  * On failure, return NULL with an error code in errno.
497  */
498 cfp *
499 cfopen(const char *path, const char *mode,
500  const pg_compress_specification compression_spec)
501 {
502  cfp *fp = pg_malloc(sizeof(cfp));
503 
504  if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
505  {
506 #ifdef HAVE_LIBZ
507  if (compression_spec.level != Z_DEFAULT_COMPRESSION)
508  {
509  /* user has specified a compression level, so tell zlib to use it */
510  char mode_compression[32];
511 
512  snprintf(mode_compression, sizeof(mode_compression), "%s%d",
513  mode, compression_spec.level);
514  fp->compressedfp = gzopen(path, mode_compression);
515  }
516  else
517  {
518  /* don't specify a level, just use the zlib default */
519  fp->compressedfp = gzopen(path, mode);
520  }
521 
522  fp->uncompressedfp = NULL;
523  if (fp->compressedfp == NULL)
524  {
525  free_keep_errno(fp);
526  fp = NULL;
527  }
528 #else
529  pg_fatal("not built with zlib support");
530 #endif
531  }
532  else
533  {
534 #ifdef HAVE_LIBZ
535  fp->compressedfp = NULL;
536 #endif
537  fp->uncompressedfp = fopen(path, mode);
538  if (fp->uncompressedfp == NULL)
539  {
540  free_keep_errno(fp);
541  fp = NULL;
542  }
543  }
544 
545  return fp;
546 }
547 
548 
549 int
550 cfread(void *ptr, int size, cfp *fp)
551 {
552  int ret;
553 
554  if (size == 0)
555  return 0;
556 
557 #ifdef HAVE_LIBZ
558  if (fp->compressedfp)
559  {
560  ret = gzread(fp->compressedfp, ptr, size);
561  if (ret != size && !gzeof(fp->compressedfp))
562  {
563  int errnum;
564  const char *errmsg = gzerror(fp->compressedfp, &errnum);
565 
566  pg_fatal("could not read from input file: %s",
567  errnum == Z_ERRNO ? strerror(errno) : errmsg);
568  }
569  }
570  else
571 #endif
572  {
573  ret = fread(ptr, 1, size, fp->uncompressedfp);
574  if (ret != size && !feof(fp->uncompressedfp))
576  }
577  return ret;
578 }
579 
580 int
581 cfwrite(const void *ptr, int size, cfp *fp)
582 {
583 #ifdef HAVE_LIBZ
584  if (fp->compressedfp)
585  return gzwrite(fp->compressedfp, ptr, size);
586  else
587 #endif
588  return fwrite(ptr, 1, size, fp->uncompressedfp);
589 }
590 
591 int
593 {
594  int ret;
595 
596 #ifdef HAVE_LIBZ
597  if (fp->compressedfp)
598  {
599  ret = gzgetc(fp->compressedfp);
600  if (ret == EOF)
601  {
602  if (!gzeof(fp->compressedfp))
603  pg_fatal("could not read from input file: %s", strerror(errno));
604  else
605  pg_fatal("could not read from input file: end of file");
606  }
607  }
608  else
609 #endif
610  {
611  ret = fgetc(fp->uncompressedfp);
612  if (ret == EOF)
614  }
615 
616  return ret;
617 }
618 
619 char *
620 cfgets(cfp *fp, char *buf, int len)
621 {
622 #ifdef HAVE_LIBZ
623  if (fp->compressedfp)
624  return gzgets(fp->compressedfp, buf, len);
625  else
626 #endif
627  return fgets(buf, len, fp->uncompressedfp);
628 }
629 
630 int
632 {
633  int result;
634 
635  if (fp == NULL)
636  {
637  errno = EBADF;
638  return EOF;
639  }
640 #ifdef HAVE_LIBZ
641  if (fp->compressedfp)
642  {
643  result = gzclose(fp->compressedfp);
644  fp->compressedfp = NULL;
645  }
646  else
647 #endif
648  {
649  result = fclose(fp->uncompressedfp);
650  fp->uncompressedfp = NULL;
651  }
652  free_keep_errno(fp);
653 
654  return result;
655 }
656 
657 int
658 cfeof(cfp *fp)
659 {
660 #ifdef HAVE_LIBZ
661  if (fp->compressedfp)
662  return gzeof(fp->compressedfp);
663  else
664 #endif
665  return feof(fp->uncompressedfp);
666 }
667 
668 const char *
670 {
671 #ifdef HAVE_LIBZ
672  if (fp->compressedfp)
673  {
674  int errnum;
675  const char *errmsg = gzerror(fp->compressedfp, &errnum);
676 
677  if (errnum != Z_ERRNO)
678  return errmsg;
679  }
680 #endif
681  return strerror(errno);
682 }
683 
684 #ifdef HAVE_LIBZ
685 static int
686 hasSuffix(const char *filename, const char *suffix)
687 {
688  int filenamelen = strlen(filename);
689  int suffixlen = strlen(suffix);
690 
691  if (filenamelen < suffixlen)
692  return 0;
693 
694  return memcmp(&filename[filenamelen - suffixlen],
695  suffix,
696  suffixlen) == 0;
697 }
698 
699 #endif
#define unconstify(underlying_type, expr)
Definition: c.h:1181
int cfwrite(const void *ptr, int size, cfp *fp)
Definition: compress_io.c:581
static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen)
Definition: compress_io.c:376
char * cfgets(cfp *fp, char *buf, int len)
Definition: compress_io.c:620
static void free_keep_errno(void *p)
Definition: compress_io.c:406
cfp * cfopen(const char *path, const char *mode, const pg_compress_specification compression_spec)
Definition: compress_io.c:499
void ReadDataFromArchive(ArchiveHandle *AH, const pg_compress_specification compression_spec, ReadFunc readF)
Definition: compress_io.c:127
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:147
int cfclose(cfp *fp)
Definition: compress_io.c:631
cfp * cfopen_write(const char *path, const char *mode, const pg_compress_specification compression_spec)
Definition: compress_io.c:469
int cfread(void *ptr, int size, cfp *fp)
Definition: compress_io.c:550
CompressorState * AllocateCompressor(const pg_compress_specification compression_spec, WriteFunc writeF)
Definition: compress_io.c:97
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:174
int cfgetc(cfp *fp)
Definition: compress_io.c:592
cfp * cfopen_read(const char *path, const char *mode)
Definition: compress_io.c:425
int cfeof(cfp *fp)
Definition: compress_io.c:658
const char * get_cfp_error(cfp *fp)
Definition: compress_io.c:669
static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
Definition: compress_io.c:358
size_t(* ReadFunc)(ArchiveHandle *AH, char **buf, size_t *buflen)
Definition: compress_io.h:38
#define ZLIB_IN_SIZE
Definition: compress_io.h:22
void(* WriteFunc)(ArchiveHandle *AH, const char *buf, size_t len)
Definition: compress_io.h:25
@ PG_COMPRESSION_GZIP
Definition: compression.h:20
@ PG_COMPRESSION_LZ4
Definition: compression.h:21
@ PG_COMPRESSION_NONE
Definition: compression.h:19
@ PG_COMPRESSION_ZSTD
Definition: compression.h:22
int errmsg(const char *fmt,...)
Definition: elog.c:946
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define free(a)
Definition: header.h:65
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
z_stream * z_streamp
#define READ_ERROR_EXIT(fd)
#define Z_DEFAULT_COMPRESSION
#define pg_fatal(...)
static PgChecksumMode mode
Definition: pg_checksums.c:65
const void size_t len
const void * data
static char * filename
Definition: pg_dumpall.c:119
static char * buf
Definition: pg_test_fsync.c:67
#define strerror
Definition: port.h:251
#define snprintf
Definition: port.h:238
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
pg_compress_specification compression_spec
Definition: compress_io.c:67
WriteFunc writeF
Definition: compress_io.c:68
FILE * uncompressedfp
Definition: compress_io.c:394
pg_compress_algorithm algorithm
Definition: compression.h:29
#define ZLIB_OUT_SIZE
Definition: walmethods.c:33