PostgreSQL Source Code  git master
compress_io.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * compress_io.c
4  * Routines for archivers to write an uncompressed or compressed data
5  * stream.
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * This file includes two APIs for dealing with compressed data. The first
11  * provides more flexibility, using callbacks to read/write data from the
12  * underlying stream. The second API is a wrapper around fopen/gzopen and
13  * friends, providing an interface similar to those, but abstracts away
14  * the possible compression. Both APIs use libz for the compression, but
15  * the second API uses gzip headers, so the resulting files can be easily
16  * manipulated with the gzip utility.
17  *
18  * Compressor API
19  * --------------
20  *
21  * The interface for writing to an archive consists of three functions:
22  * AllocateCompressor, WriteDataToArchive and EndCompressor. First you call
23  * AllocateCompressor, then write all the data by calling WriteDataToArchive
24  * as many times as needed, and finally EndCompressor. WriteDataToArchive
25  * and EndCompressor will call the WriteFunc that was provided to
26  * AllocateCompressor for each chunk of compressed data.
27  *
28  * The interface for reading an archive consists of just one function:
29  * ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input
30  * stream, by repeatedly calling the given ReadFunc. ReadFunc returns the
31  * compressed data chunk at a time, and ReadDataFromArchive decompresses it
32  * and passes the decompressed data to ahwrite(), until ReadFunc returns 0
33  * to signal EOF.
34  *
35  * The interface is the same for compressed and uncompressed streams.
36  *
37  * Compressed stream API
38  * ----------------------
39  *
40  * The compressed stream API is a wrapper around the C standard fopen() and
41  * libz's gzopen() APIs. It allows you to use the same functions for
42  * compressed and uncompressed streams. cfopen_read() first tries to open
43  * the file with given name, and if it fails, it tries to open the same
44  * file with the .gz suffix. cfopen_write() opens a file for writing, an
45  * extra argument specifies if the file should be compressed, and adds the
46  * .gz suffix to the filename if so. This allows you to easily handle both
47  * compressed and uncompressed files.
48  *
49  * IDENTIFICATION
50  * src/bin/pg_dump/compress_io.c
51  *
52  *-------------------------------------------------------------------------
53  */
54 #include "postgres_fe.h"
55 
56 #include "compress_io.h"
57 #include "pg_backup_utils.h"
58 
59 /*----------------------
60  * Compressor API
61  *----------------------
62  */
63 
64 /* typedef appears in compress_io.h */
66 {
69 
70 #ifdef HAVE_LIBZ
71  z_streamp zp;
72  char *zlibOut;
73  size_t zlibOutSize;
74 #endif
75 };
76 
77 static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
78  int *level);
79 
80 /* Routines that support zlib compressed data I/O */
81 #ifdef HAVE_LIBZ
82 static void InitCompressorZlib(CompressorState *cs, int level);
83 static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
84  bool flush);
85 static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
86 static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
87  const char *data, size_t dLen);
88 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
89 #endif
90 
91 /* Routines that support uncompressed data I/O */
92 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
94  const char *data, size_t dLen);
95 
96 /*
97  * Interprets a numeric 'compression' value. The algorithm implied by the
98  * value (zlib or none at the moment), is returned in *alg, and the
99  * zlib compression level in *level.
100  */
101 static void
102 ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
103 {
104  if (compression == Z_DEFAULT_COMPRESSION ||
105  (compression > 0 && compression <= 9))
106  *alg = COMPR_ALG_LIBZ;
107  else if (compression == 0)
108  *alg = COMPR_ALG_NONE;
109  else
110  {
111  fatal("invalid compression code: %d", compression);
112  *alg = COMPR_ALG_NONE; /* keep compiler quiet */
113  }
114 
115  /* The level is just the passed-in value. */
116  if (level)
117  *level = compression;
118 }
119 
120 /* Public interface routines */
121 
122 /* Allocate a new compressor */
125 {
126  CompressorState *cs;
128  int level;
129 
130  ParseCompressionOption(compression, &alg, &level);
131 
132 #ifndef HAVE_LIBZ
133  if (alg == COMPR_ALG_LIBZ)
134  fatal("not built with zlib support");
135 #endif
136 
137  cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
138  cs->writeF = writeF;
139  cs->comprAlg = alg;
140 
141  /*
142  * Perform compression algorithm specific initialization.
143  */
144 #ifdef HAVE_LIBZ
145  if (alg == COMPR_ALG_LIBZ)
146  InitCompressorZlib(cs, level);
147 #endif
148 
149  return cs;
150 }
151 
152 /*
153  * Read all compressed data from the input stream (via readF) and print it
154  * out with ahwrite().
155  */
156 void
157 ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
158 {
160 
161  ParseCompressionOption(compression, &alg, NULL);
162 
163  if (alg == COMPR_ALG_NONE)
164  ReadDataFromArchiveNone(AH, readF);
165  if (alg == COMPR_ALG_LIBZ)
166  {
167 #ifdef HAVE_LIBZ
168  ReadDataFromArchiveZlib(AH, readF);
169 #else
170  fatal("not built with zlib support");
171 #endif
172  }
173 }
174 
175 /*
176  * Compress and write data to the output stream (via writeF).
177  */
178 void
180  const void *data, size_t dLen)
181 {
182  switch (cs->comprAlg)
183  {
184  case COMPR_ALG_LIBZ:
185 #ifdef HAVE_LIBZ
186  WriteDataToArchiveZlib(AH, cs, data, dLen);
187 #else
188  fatal("not built with zlib support");
189 #endif
190  break;
191  case COMPR_ALG_NONE:
192  WriteDataToArchiveNone(AH, cs, data, dLen);
193  break;
194  }
195 }
196 
197 /*
198  * Terminate compression library context and flush its buffers.
199  */
200 void
202 {
203 #ifdef HAVE_LIBZ
204  if (cs->comprAlg == COMPR_ALG_LIBZ)
205  EndCompressorZlib(AH, cs);
206 #endif
207  free(cs);
208 }
209 
210 /* Private routines, specific to each compression method. */
211 
212 #ifdef HAVE_LIBZ
213 /*
214  * Functions for zlib compressed output.
215  */
216 
217 static void
218 InitCompressorZlib(CompressorState *cs, int level)
219 {
220  z_streamp zp;
221 
222  zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
223  zp->zalloc = Z_NULL;
224  zp->zfree = Z_NULL;
225  zp->opaque = Z_NULL;
226 
227  /*
228  * zlibOutSize is the buffer size we tell zlib it can output to. We
229  * actually allocate one extra byte because some routines want to append a
230  * trailing zero byte to the zlib output.
231  */
232  cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
233  cs->zlibOutSize = ZLIB_OUT_SIZE;
234 
235  if (deflateInit(zp, level) != Z_OK)
236  fatal("could not initialize compression library: %s",
237  zp->msg);
238 
239  /* Just be paranoid - maybe End is called after Start, with no Write */
240  zp->next_out = (void *) cs->zlibOut;
241  zp->avail_out = cs->zlibOutSize;
242 }
243 
244 static void
245 EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
246 {
247  z_streamp zp = cs->zp;
248 
249  zp->next_in = NULL;
250  zp->avail_in = 0;
251 
252  /* Flush any remaining data from zlib buffer */
253  DeflateCompressorZlib(AH, cs, true);
254 
255  if (deflateEnd(zp) != Z_OK)
256  fatal("could not close compression stream: %s", zp->msg);
257 
258  free(cs->zlibOut);
259  free(cs->zp);
260 }
261 
262 static void
263 DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
264 {
265  z_streamp zp = cs->zp;
266  char *out = cs->zlibOut;
267  int res = Z_OK;
268 
269  while (cs->zp->avail_in != 0 || flush)
270  {
271  res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
272  if (res == Z_STREAM_ERROR)
273  fatal("could not compress data: %s", zp->msg);
274  if ((flush && (zp->avail_out < cs->zlibOutSize))
275  || (zp->avail_out == 0)
276  || (zp->avail_in != 0)
277  )
278  {
279  /*
280  * Extra paranoia: avoid zero-length chunks, since a zero length
281  * chunk is the EOF marker in the custom format. This should never
282  * happen but...
283  */
284  if (zp->avail_out < cs->zlibOutSize)
285  {
286  /*
287  * Any write function should do its own error checking but to
288  * make sure we do a check here as well...
289  */
290  size_t len = cs->zlibOutSize - zp->avail_out;
291 
292  cs->writeF(AH, out, len);
293  }
294  zp->next_out = (void *) out;
295  zp->avail_out = cs->zlibOutSize;
296  }
297 
298  if (res == Z_STREAM_END)
299  break;
300  }
301 }
302 
303 static void
304 WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
305  const char *data, size_t dLen)
306 {
307  cs->zp->next_in = (void *) unconstify(char *, data);
308  cs->zp->avail_in = dLen;
309  DeflateCompressorZlib(AH, cs, false);
310 }
311 
312 static void
313 ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
314 {
315  z_streamp zp;
316  char *out;
317  int res = Z_OK;
318  size_t cnt;
319  char *buf;
320  size_t buflen;
321 
322  zp = (z_streamp) pg_malloc(sizeof(z_stream));
323  zp->zalloc = Z_NULL;
324  zp->zfree = Z_NULL;
325  zp->opaque = Z_NULL;
326 
327  buf = pg_malloc(ZLIB_IN_SIZE);
328  buflen = ZLIB_IN_SIZE;
329 
330  out = pg_malloc(ZLIB_OUT_SIZE + 1);
331 
332  if (inflateInit(zp) != Z_OK)
333  fatal("could not initialize compression library: %s",
334  zp->msg);
335 
336  /* no minimal chunk size for zlib */
337  while ((cnt = readF(AH, &buf, &buflen)))
338  {
339  zp->next_in = (void *) buf;
340  zp->avail_in = cnt;
341 
342  while (zp->avail_in > 0)
343  {
344  zp->next_out = (void *) out;
345  zp->avail_out = ZLIB_OUT_SIZE;
346 
347  res = inflate(zp, 0);
348  if (res != Z_OK && res != Z_STREAM_END)
349  fatal("could not uncompress data: %s", zp->msg);
350 
351  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
352  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
353  }
354  }
355 
356  zp->next_in = NULL;
357  zp->avail_in = 0;
358  while (res != Z_STREAM_END)
359  {
360  zp->next_out = (void *) out;
361  zp->avail_out = ZLIB_OUT_SIZE;
362  res = inflate(zp, 0);
363  if (res != Z_OK && res != Z_STREAM_END)
364  fatal("could not uncompress data: %s", zp->msg);
365 
366  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
367  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
368  }
369 
370  if (inflateEnd(zp) != Z_OK)
371  fatal("could not close compression library: %s", zp->msg);
372 
373  free(buf);
374  free(out);
375  free(zp);
376 }
377 #endif /* HAVE_LIBZ */
378 
379 
380 /*
381  * Functions for uncompressed output.
382  */
383 
384 static void
386 {
387  size_t cnt;
388  char *buf;
389  size_t buflen;
390 
391  buf = pg_malloc(ZLIB_OUT_SIZE);
392  buflen = ZLIB_OUT_SIZE;
393 
394  while ((cnt = readF(AH, &buf, &buflen)))
395  {
396  ahwrite(buf, 1, cnt, AH);
397  }
398 
399  free(buf);
400 }
401 
402 static void
404  const char *data, size_t dLen)
405 {
406  cs->writeF(AH, data, dLen);
407 }
408 
409 
410 /*----------------------
411  * Compressed stream API
412  *----------------------
413  */
414 
415 /*
416  * cfp represents an open stream, wrapping the underlying FILE or gzFile
417  * pointer. This is opaque to the callers.
418  */
419 struct cfp
420 {
422 #ifdef HAVE_LIBZ
423  gzFile compressedfp;
424 #endif
425 };
426 
427 #ifdef HAVE_LIBZ
428 static int hasSuffix(const char *filename, const char *suffix);
429 #endif
430 
431 /* free() without changing errno; useful in several places below */
432 static void
434 {
435  int save_errno = errno;
436 
437  free(p);
438  errno = save_errno;
439 }
440 
441 /*
442  * Open a file for reading. 'path' is the file to open, and 'mode' should
443  * be either "r" or "rb".
444  *
445  * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
446  * doesn't already have it) and try again. So if you pass "foo" as 'path',
447  * this will open either "foo" or "foo.gz".
448  *
449  * On failure, return NULL with an error code in errno.
450  */
451 cfp *
452 cfopen_read(const char *path, const char *mode)
453 {
454  cfp *fp;
455 
456 #ifdef HAVE_LIBZ
457  if (hasSuffix(path, ".gz"))
458  fp = cfopen(path, mode, 1);
459  else
460 #endif
461  {
462  fp = cfopen(path, mode, 0);
463 #ifdef HAVE_LIBZ
464  if (fp == NULL)
465  {
466  char *fname;
467 
468  fname = psprintf("%s.gz", path);
469  fp = cfopen(fname, mode, 1);
470  free_keep_errno(fname);
471  }
472 #endif
473  }
474  return fp;
475 }
476 
477 /*
478  * Open a file for writing. 'path' indicates the path name, and 'mode' must
479  * be a filemode as accepted by fopen() and gzopen() that indicates writing
480  * ("w", "wb", "a", or "ab").
481  *
482  * If 'compression' is non-zero, a gzip compressed stream is opened, and
483  * 'compression' indicates the compression level used. The ".gz" suffix
484  * is automatically added to 'path' in that case.
485  *
486  * On failure, return NULL with an error code in errno.
487  */
488 cfp *
489 cfopen_write(const char *path, const char *mode, int compression)
490 {
491  cfp *fp;
492 
493  if (compression == 0)
494  fp = cfopen(path, mode, 0);
495  else
496  {
497 #ifdef HAVE_LIBZ
498  char *fname;
499 
500  fname = psprintf("%s.gz", path);
501  fp = cfopen(fname, mode, compression);
502  free_keep_errno(fname);
503 #else
504  fatal("not built with zlib support");
505  fp = NULL; /* keep compiler quiet */
506 #endif
507  }
508  return fp;
509 }
510 
511 /*
512  * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
513  * is opened with libz gzopen(), otherwise with plain fopen().
514  *
515  * On failure, return NULL with an error code in errno.
516  */
517 cfp *
518 cfopen(const char *path, const char *mode, int compression)
519 {
520  cfp *fp = pg_malloc(sizeof(cfp));
521 
522  if (compression != 0)
523  {
524 #ifdef HAVE_LIBZ
525  if (compression != Z_DEFAULT_COMPRESSION)
526  {
527  /* user has specified a compression level, so tell zlib to use it */
528  char mode_compression[32];
529 
530  snprintf(mode_compression, sizeof(mode_compression), "%s%d",
531  mode, compression);
532  fp->compressedfp = gzopen(path, mode_compression);
533  }
534  else
535  {
536  /* don't specify a level, just use the zlib default */
537  fp->compressedfp = gzopen(path, mode);
538  }
539 
540  fp->uncompressedfp = NULL;
541  if (fp->compressedfp == NULL)
542  {
543  free_keep_errno(fp);
544  fp = NULL;
545  }
546 #else
547  fatal("not built with zlib support");
548 #endif
549  }
550  else
551  {
552 #ifdef HAVE_LIBZ
553  fp->compressedfp = NULL;
554 #endif
555  fp->uncompressedfp = fopen(path, mode);
556  if (fp->uncompressedfp == NULL)
557  {
558  free_keep_errno(fp);
559  fp = NULL;
560  }
561  }
562 
563  return fp;
564 }
565 
566 
567 int
568 cfread(void *ptr, int size, cfp *fp)
569 {
570  int ret;
571 
572  if (size == 0)
573  return 0;
574 
575 #ifdef HAVE_LIBZ
576  if (fp->compressedfp)
577  {
578  ret = gzread(fp->compressedfp, ptr, size);
579  if (ret != size && !gzeof(fp->compressedfp))
580  {
581  int errnum;
582  const char *errmsg = gzerror(fp->compressedfp, &errnum);
583 
584  fatal("could not read from input file: %s",
585  errnum == Z_ERRNO ? strerror(errno) : errmsg);
586  }
587  }
588  else
589 #endif
590  {
591  ret = fread(ptr, 1, size, fp->uncompressedfp);
592  if (ret != size && !feof(fp->uncompressedfp))
594  }
595  return ret;
596 }
597 
598 int
599 cfwrite(const void *ptr, int size, cfp *fp)
600 {
601 #ifdef HAVE_LIBZ
602  if (fp->compressedfp)
603  return gzwrite(fp->compressedfp, ptr, size);
604  else
605 #endif
606  return fwrite(ptr, 1, size, fp->uncompressedfp);
607 }
608 
609 int
611 {
612  int ret;
613 
614 #ifdef HAVE_LIBZ
615  if (fp->compressedfp)
616  {
617  ret = gzgetc(fp->compressedfp);
618  if (ret == EOF)
619  {
620  if (!gzeof(fp->compressedfp))
621  fatal("could not read from input file: %s", strerror(errno));
622  else
623  fatal("could not read from input file: end of file");
624  }
625  }
626  else
627 #endif
628  {
629  ret = fgetc(fp->uncompressedfp);
630  if (ret == EOF)
632  }
633 
634  return ret;
635 }
636 
637 char *
638 cfgets(cfp *fp, char *buf, int len)
639 {
640 #ifdef HAVE_LIBZ
641  if (fp->compressedfp)
642  return gzgets(fp->compressedfp, buf, len);
643  else
644 #endif
645  return fgets(buf, len, fp->uncompressedfp);
646 }
647 
648 int
650 {
651  int result;
652 
653  if (fp == NULL)
654  {
655  errno = EBADF;
656  return EOF;
657  }
658 #ifdef HAVE_LIBZ
659  if (fp->compressedfp)
660  {
661  result = gzclose(fp->compressedfp);
662  fp->compressedfp = NULL;
663  }
664  else
665 #endif
666  {
667  result = fclose(fp->uncompressedfp);
668  fp->uncompressedfp = NULL;
669  }
670  free_keep_errno(fp);
671 
672  return result;
673 }
674 
675 int
676 cfeof(cfp *fp)
677 {
678 #ifdef HAVE_LIBZ
679  if (fp->compressedfp)
680  return gzeof(fp->compressedfp);
681  else
682 #endif
683  return feof(fp->uncompressedfp);
684 }
685 
686 const char *
688 {
689 #ifdef HAVE_LIBZ
690  if (fp->compressedfp)
691  {
692  int errnum;
693  const char *errmsg = gzerror(fp->compressedfp, &errnum);
694 
695  if (errnum != Z_ERRNO)
696  return errmsg;
697  }
698 #endif
699  return strerror(errno);
700 }
701 
702 #ifdef HAVE_LIBZ
703 static int
704 hasSuffix(const char *filename, const char *suffix)
705 {
706  int filenamelen = strlen(filename);
707  int suffixlen = strlen(suffix);
708 
709  if (filenamelen < suffixlen)
710  return 0;
711 
712  return memcmp(&filename[filenamelen - suffixlen],
713  suffix,
714  suffixlen) == 0;
715 }
716 
717 #endif
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:179
static PgChecksumMode mode
Definition: pg_checksums.c:61
#define Z_DEFAULT_COMPRESSION
CompressionAlgorithm
Definition: compress_io.h:24
CompressorState * AllocateCompressor(int compression, WriteFunc writeF)
Definition: compress_io.c:124
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int cfeof(cfp *fp)
Definition: compress_io.c:676
CompressionAlgorithm comprAlg
Definition: compress_io.c:67
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
static void free_keep_errno(void *p)
Definition: compress_io.c:433
FILE * uncompressedfp
Definition: compress_io.c:421
void(* WriteFunc)(ArchiveHandle *AH, const char *buf, size_t len)
Definition: compress_io.h:31
int cfread(void *ptr, int size, cfp *fp)
Definition: compress_io.c:568
WriteFunc writeF
Definition: compress_io.c:68
int cfclose(cfp *fp)
Definition: compress_io.c:649
static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen)
Definition: compress_io.c:403
void ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
Definition: compress_io.c:157
static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
Definition: compress_io.c:102
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
cfp * cfopen(const char *path, const char *mode, int compression)
Definition: compress_io.c:518
static char * buf
Definition: pg_test_fsync.c:67
char * cfgets(cfp *fp, char *buf, int len)
Definition: compress_io.c:638
size_t(* ReadFunc)(ArchiveHandle *AH, char **buf, size_t *buflen)
Definition: compress_io.h:44
z_stream * z_streamp
#define ZLIB_IN_SIZE
Definition: compress_io.h:22
#define unconstify(underlying_type, expr)
Definition: c.h:1194
static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
Definition: compress_io.c:385
#define free(a)
Definition: header.h:65
int cfwrite(const void *ptr, int size, cfp *fp)
Definition: compress_io.c:599
#define strerror
Definition: port.h:205
int cfgetc(cfp *fp)
Definition: compress_io.c:610
#define fatal(...)
#define ZLIB_OUT_SIZE
Definition: walmethods.c:31
cfp * cfopen_write(const char *path, const char *mode, int compression)
Definition: compress_io.c:489
static char * filename
Definition: pg_dumpall.c:90
int errmsg(const char *fmt,...)
Definition: elog.c:822
cfp * cfopen_read(const char *path, const char *mode)
Definition: compress_io.c:452
#define READ_ERROR_EXIT(fd)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
const char * get_cfp_error(cfp *fp)
Definition: compress_io.c:687
#define snprintf
Definition: port.h:192
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:201