PostgreSQL Source Code  git master
compress_io.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * compress_io.c
4  * Routines for archivers to write an uncompressed or compressed data
5  * stream.
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * This file includes two APIs for dealing with compressed data. The first
11  * provides more flexibility, using callbacks to read/write data from the
12  * underlying stream. The second API is a wrapper around fopen/gzopen and
13  * friends, providing an interface similar to those, but abstracts away
14  * the possible compression. Both APIs use libz for the compression, but
15  * the second API uses gzip headers, so the resulting files can be easily
16  * manipulated with the gzip utility.
17  *
18  * Compressor API
19  * --------------
20  *
21  * The interface for writing to an archive consists of three functions:
22  * AllocateCompressor, WriteDataToArchive and EndCompressor. First you call
23  * AllocateCompressor, then write all the data by calling WriteDataToArchive
24  * as many times as needed, and finally EndCompressor. WriteDataToArchive
25  * and EndCompressor will call the WriteFunc that was provided to
26  * AllocateCompressor for each chunk of compressed data.
27  *
28  * The interface for reading an archive consists of just one function:
29  * ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input
30  * stream, by repeatedly calling the given ReadFunc. ReadFunc returns the
31  * compressed data chunk at a time, and ReadDataFromArchive decompresses it
32  * and passes the decompressed data to ahwrite(), until ReadFunc returns 0
33  * to signal EOF.
34  *
35  * The interface is the same for compressed and uncompressed streams.
36  *
37  * Compressed stream API
38  * ----------------------
39  *
40  * The compressed stream API is a wrapper around the C standard fopen() and
41  * libz's gzopen() APIs. It allows you to use the same functions for
42  * compressed and uncompressed streams. cfopen_read() first tries to open
43  * the file with given name, and if it fails, it tries to open the same
44  * file with the .gz suffix. cfopen_write() opens a file for writing, an
45  * extra argument specifies if the file should be compressed, and adds the
46  * .gz suffix to the filename if so. This allows you to easily handle both
47  * compressed and uncompressed files.
48  *
49  * IDENTIFICATION
50  * src/bin/pg_dump/compress_io.c
51  *
52  *-------------------------------------------------------------------------
53  */
54 #include "postgres_fe.h"
55 
56 #include "compress_io.h"
57 #include "pg_backup_utils.h"
58 
59 /*----------------------
60  * Compressor API
61  *----------------------
62  */
63 
64 /* typedef appears in compress_io.h */
66 {
69 
70 #ifdef HAVE_LIBZ
71  z_streamp zp;
72  char *zlibOut;
73  size_t zlibOutSize;
74 #endif
75 };
76 
77 static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
78  int *level);
79 
80 /* Routines that support zlib compressed data I/O */
81 #ifdef HAVE_LIBZ
82 static void InitCompressorZlib(CompressorState *cs, int level);
83 static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
84  bool flush);
85 static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
86 static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
87  const char *data, size_t dLen);
88 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
89 #endif
90 
91 /* Routines that support uncompressed data I/O */
92 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
94  const char *data, size_t dLen);
95 
96 /*
97  * Interprets a numeric 'compression' value. The algorithm implied by the
98  * value (zlib or none at the moment), is returned in *alg, and the
99  * zlib compression level in *level.
100  */
101 static void
102 ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
103 {
104  if (compression == Z_DEFAULT_COMPRESSION ||
105  (compression > 0 && compression <= 9))
106  *alg = COMPR_ALG_LIBZ;
107  else if (compression == 0)
108  *alg = COMPR_ALG_NONE;
109  else
110  {
111  fatal("invalid compression code: %d", compression);
112  *alg = COMPR_ALG_NONE; /* keep compiler quiet */
113  }
114 
115  /* The level is just the passed-in value. */
116  if (level)
117  *level = compression;
118 }
119 
120 /* Public interface routines */
121 
122 /* Allocate a new compressor */
125 {
126  CompressorState *cs;
128  int level;
129 
130  ParseCompressionOption(compression, &alg, &level);
131 
132 #ifndef HAVE_LIBZ
133  if (alg == COMPR_ALG_LIBZ)
134  fatal("not built with zlib support");
135 #endif
136 
137  cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
138  cs->writeF = writeF;
139  cs->comprAlg = alg;
140 
141  /*
142  * Perform compression algorithm specific initialization.
143  */
144 #ifdef HAVE_LIBZ
145  if (alg == COMPR_ALG_LIBZ)
146  InitCompressorZlib(cs, level);
147 #endif
148 
149  return cs;
150 }
151 
152 /*
153  * Read all compressed data from the input stream (via readF) and print it
154  * out with ahwrite().
155  */
156 void
157 ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
158 {
160 
161  ParseCompressionOption(compression, &alg, NULL);
162 
163  if (alg == COMPR_ALG_NONE)
164  ReadDataFromArchiveNone(AH, readF);
165  if (alg == COMPR_ALG_LIBZ)
166  {
167 #ifdef HAVE_LIBZ
168  ReadDataFromArchiveZlib(AH, readF);
169 #else
170  fatal("not built with zlib support");
171 #endif
172  }
173 }
174 
175 /*
176  * Compress and write data to the output stream (via writeF).
177  */
178 void
180  const void *data, size_t dLen)
181 {
182  switch (cs->comprAlg)
183  {
184  case COMPR_ALG_LIBZ:
185 #ifdef HAVE_LIBZ
186  WriteDataToArchiveZlib(AH, cs, data, dLen);
187 #else
188  fatal("not built with zlib support");
189 #endif
190  break;
191  case COMPR_ALG_NONE:
192  WriteDataToArchiveNone(AH, cs, data, dLen);
193  break;
194  }
195  return;
196 }
197 
198 /*
199  * Terminate compression library context and flush its buffers.
200  */
201 void
203 {
204 #ifdef HAVE_LIBZ
205  if (cs->comprAlg == COMPR_ALG_LIBZ)
206  EndCompressorZlib(AH, cs);
207 #endif
208  free(cs);
209 }
210 
211 /* Private routines, specific to each compression method. */
212 
213 #ifdef HAVE_LIBZ
214 /*
215  * Functions for zlib compressed output.
216  */
217 
218 static void
219 InitCompressorZlib(CompressorState *cs, int level)
220 {
221  z_streamp zp;
222 
223  zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
224  zp->zalloc = Z_NULL;
225  zp->zfree = Z_NULL;
226  zp->opaque = Z_NULL;
227 
228  /*
229  * zlibOutSize is the buffer size we tell zlib it can output to. We
230  * actually allocate one extra byte because some routines want to append a
231  * trailing zero byte to the zlib output.
232  */
233  cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
234  cs->zlibOutSize = ZLIB_OUT_SIZE;
235 
236  if (deflateInit(zp, level) != Z_OK)
237  fatal("could not initialize compression library: %s",
238  zp->msg);
239 
240  /* Just be paranoid - maybe End is called after Start, with no Write */
241  zp->next_out = (void *) cs->zlibOut;
242  zp->avail_out = cs->zlibOutSize;
243 }
244 
245 static void
246 EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
247 {
248  z_streamp zp = cs->zp;
249 
250  zp->next_in = NULL;
251  zp->avail_in = 0;
252 
253  /* Flush any remaining data from zlib buffer */
254  DeflateCompressorZlib(AH, cs, true);
255 
256  if (deflateEnd(zp) != Z_OK)
257  fatal("could not close compression stream: %s", zp->msg);
258 
259  free(cs->zlibOut);
260  free(cs->zp);
261 }
262 
263 static void
264 DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
265 {
266  z_streamp zp = cs->zp;
267  char *out = cs->zlibOut;
268  int res = Z_OK;
269 
270  while (cs->zp->avail_in != 0 || flush)
271  {
272  res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
273  if (res == Z_STREAM_ERROR)
274  fatal("could not compress data: %s", zp->msg);
275  if ((flush && (zp->avail_out < cs->zlibOutSize))
276  || (zp->avail_out == 0)
277  || (zp->avail_in != 0)
278  )
279  {
280  /*
281  * Extra paranoia: avoid zero-length chunks, since a zero length
282  * chunk is the EOF marker in the custom format. This should never
283  * happen but...
284  */
285  if (zp->avail_out < cs->zlibOutSize)
286  {
287  /*
288  * Any write function should do its own error checking but to
289  * make sure we do a check here as well...
290  */
291  size_t len = cs->zlibOutSize - zp->avail_out;
292 
293  cs->writeF(AH, out, len);
294  }
295  zp->next_out = (void *) out;
296  zp->avail_out = cs->zlibOutSize;
297  }
298 
299  if (res == Z_STREAM_END)
300  break;
301  }
302 }
303 
304 static void
305 WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
306  const char *data, size_t dLen)
307 {
308  cs->zp->next_in = (void *) unconstify(char *, data);
309  cs->zp->avail_in = dLen;
310  DeflateCompressorZlib(AH, cs, false);
311 
312  return;
313 }
314 
315 static void
316 ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
317 {
318  z_streamp zp;
319  char *out;
320  int res = Z_OK;
321  size_t cnt;
322  char *buf;
323  size_t buflen;
324 
325  zp = (z_streamp) pg_malloc(sizeof(z_stream));
326  zp->zalloc = Z_NULL;
327  zp->zfree = Z_NULL;
328  zp->opaque = Z_NULL;
329 
330  buf = pg_malloc(ZLIB_IN_SIZE);
331  buflen = ZLIB_IN_SIZE;
332 
333  out = pg_malloc(ZLIB_OUT_SIZE + 1);
334 
335  if (inflateInit(zp) != Z_OK)
336  fatal("could not initialize compression library: %s",
337  zp->msg);
338 
339  /* no minimal chunk size for zlib */
340  while ((cnt = readF(AH, &buf, &buflen)))
341  {
342  zp->next_in = (void *) buf;
343  zp->avail_in = cnt;
344 
345  while (zp->avail_in > 0)
346  {
347  zp->next_out = (void *) out;
348  zp->avail_out = ZLIB_OUT_SIZE;
349 
350  res = inflate(zp, 0);
351  if (res != Z_OK && res != Z_STREAM_END)
352  fatal("could not uncompress data: %s", zp->msg);
353 
354  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
355  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
356  }
357  }
358 
359  zp->next_in = NULL;
360  zp->avail_in = 0;
361  while (res != Z_STREAM_END)
362  {
363  zp->next_out = (void *) out;
364  zp->avail_out = ZLIB_OUT_SIZE;
365  res = inflate(zp, 0);
366  if (res != Z_OK && res != Z_STREAM_END)
367  fatal("could not uncompress data: %s", zp->msg);
368 
369  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
370  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
371  }
372 
373  if (inflateEnd(zp) != Z_OK)
374  fatal("could not close compression library: %s", zp->msg);
375 
376  free(buf);
377  free(out);
378  free(zp);
379 }
380 #endif /* HAVE_LIBZ */
381 
382 
383 /*
384  * Functions for uncompressed output.
385  */
386 
387 static void
389 {
390  size_t cnt;
391  char *buf;
392  size_t buflen;
393 
394  buf = pg_malloc(ZLIB_OUT_SIZE);
395  buflen = ZLIB_OUT_SIZE;
396 
397  while ((cnt = readF(AH, &buf, &buflen)))
398  {
399  ahwrite(buf, 1, cnt, AH);
400  }
401 
402  free(buf);
403 }
404 
405 static void
407  const char *data, size_t dLen)
408 {
409  cs->writeF(AH, data, dLen);
410  return;
411 }
412 
413 
414 /*----------------------
415  * Compressed stream API
416  *----------------------
417  */
418 
419 /*
420  * cfp represents an open stream, wrapping the underlying FILE or gzFile
421  * pointer. This is opaque to the callers.
422  */
423 struct cfp
424 {
426 #ifdef HAVE_LIBZ
427  gzFile compressedfp;
428 #endif
429 };
430 
431 #ifdef HAVE_LIBZ
432 static int hasSuffix(const char *filename, const char *suffix);
433 #endif
434 
435 /* free() without changing errno; useful in several places below */
436 static void
438 {
439  int save_errno = errno;
440 
441  free(p);
442  errno = save_errno;
443 }
444 
445 /*
446  * Open a file for reading. 'path' is the file to open, and 'mode' should
447  * be either "r" or "rb".
448  *
449  * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
450  * doesn't already have it) and try again. So if you pass "foo" as 'path',
451  * this will open either "foo" or "foo.gz".
452  *
453  * On failure, return NULL with an error code in errno.
454  */
455 cfp *
456 cfopen_read(const char *path, const char *mode)
457 {
458  cfp *fp;
459 
460 #ifdef HAVE_LIBZ
461  if (hasSuffix(path, ".gz"))
462  fp = cfopen(path, mode, 1);
463  else
464 #endif
465  {
466  fp = cfopen(path, mode, 0);
467 #ifdef HAVE_LIBZ
468  if (fp == NULL)
469  {
470  char *fname;
471 
472  fname = psprintf("%s.gz", path);
473  fp = cfopen(fname, mode, 1);
474  free_keep_errno(fname);
475  }
476 #endif
477  }
478  return fp;
479 }
480 
481 /*
482  * Open a file for writing. 'path' indicates the path name, and 'mode' must
483  * be a filemode as accepted by fopen() and gzopen() that indicates writing
484  * ("w", "wb", "a", or "ab").
485  *
486  * If 'compression' is non-zero, a gzip compressed stream is opened, and
487  * 'compression' indicates the compression level used. The ".gz" suffix
488  * is automatically added to 'path' in that case.
489  *
490  * On failure, return NULL with an error code in errno.
491  */
492 cfp *
493 cfopen_write(const char *path, const char *mode, int compression)
494 {
495  cfp *fp;
496 
497  if (compression == 0)
498  fp = cfopen(path, mode, 0);
499  else
500  {
501 #ifdef HAVE_LIBZ
502  char *fname;
503 
504  fname = psprintf("%s.gz", path);
505  fp = cfopen(fname, mode, compression);
506  free_keep_errno(fname);
507 #else
508  fatal("not built with zlib support");
509  fp = NULL; /* keep compiler quiet */
510 #endif
511  }
512  return fp;
513 }
514 
515 /*
516  * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
517  * is opened with libz gzopen(), otherwise with plain fopen().
518  *
519  * On failure, return NULL with an error code in errno.
520  */
521 cfp *
522 cfopen(const char *path, const char *mode, int compression)
523 {
524  cfp *fp = pg_malloc(sizeof(cfp));
525 
526  if (compression != 0)
527  {
528 #ifdef HAVE_LIBZ
529  if (compression != Z_DEFAULT_COMPRESSION)
530  {
531  /* user has specified a compression level, so tell zlib to use it */
532  char mode_compression[32];
533 
534  snprintf(mode_compression, sizeof(mode_compression), "%s%d",
535  mode, compression);
536  fp->compressedfp = gzopen(path, mode_compression);
537  }
538  else
539  {
540  /* don't specify a level, just use the zlib default */
541  fp->compressedfp = gzopen(path, mode);
542  }
543 
544  fp->uncompressedfp = NULL;
545  if (fp->compressedfp == NULL)
546  {
547  free_keep_errno(fp);
548  fp = NULL;
549  }
550 #else
551  fatal("not built with zlib support");
552 #endif
553  }
554  else
555  {
556 #ifdef HAVE_LIBZ
557  fp->compressedfp = NULL;
558 #endif
559  fp->uncompressedfp = fopen(path, mode);
560  if (fp->uncompressedfp == NULL)
561  {
562  free_keep_errno(fp);
563  fp = NULL;
564  }
565  }
566 
567  return fp;
568 }
569 
570 
571 int
572 cfread(void *ptr, int size, cfp *fp)
573 {
574  int ret;
575 
576  if (size == 0)
577  return 0;
578 
579 #ifdef HAVE_LIBZ
580  if (fp->compressedfp)
581  {
582  ret = gzread(fp->compressedfp, ptr, size);
583  if (ret != size && !gzeof(fp->compressedfp))
584  {
585  int errnum;
586  const char *errmsg = gzerror(fp->compressedfp, &errnum);
587 
588  fatal("could not read from input file: %s",
589  errnum == Z_ERRNO ? strerror(errno) : errmsg);
590  }
591  }
592  else
593 #endif
594  {
595  ret = fread(ptr, 1, size, fp->uncompressedfp);
596  if (ret != size && !feof(fp->uncompressedfp))
598  }
599  return ret;
600 }
601 
602 int
603 cfwrite(const void *ptr, int size, cfp *fp)
604 {
605 #ifdef HAVE_LIBZ
606  if (fp->compressedfp)
607  return gzwrite(fp->compressedfp, ptr, size);
608  else
609 #endif
610  return fwrite(ptr, 1, size, fp->uncompressedfp);
611 }
612 
613 int
615 {
616  int ret;
617 
618 #ifdef HAVE_LIBZ
619  if (fp->compressedfp)
620  {
621  ret = gzgetc(fp->compressedfp);
622  if (ret == EOF)
623  {
624  if (!gzeof(fp->compressedfp))
625  fatal("could not read from input file: %s", strerror(errno));
626  else
627  fatal("could not read from input file: end of file");
628  }
629  }
630  else
631 #endif
632  {
633  ret = fgetc(fp->uncompressedfp);
634  if (ret == EOF)
636  }
637 
638  return ret;
639 }
640 
641 char *
642 cfgets(cfp *fp, char *buf, int len)
643 {
644 #ifdef HAVE_LIBZ
645  if (fp->compressedfp)
646  return gzgets(fp->compressedfp, buf, len);
647  else
648 #endif
649  return fgets(buf, len, fp->uncompressedfp);
650 }
651 
652 int
654 {
655  int result;
656 
657  if (fp == NULL)
658  {
659  errno = EBADF;
660  return EOF;
661  }
662 #ifdef HAVE_LIBZ
663  if (fp->compressedfp)
664  {
665  result = gzclose(fp->compressedfp);
666  fp->compressedfp = NULL;
667  }
668  else
669 #endif
670  {
671  result = fclose(fp->uncompressedfp);
672  fp->uncompressedfp = NULL;
673  }
674  free_keep_errno(fp);
675 
676  return result;
677 }
678 
679 int
680 cfeof(cfp *fp)
681 {
682 #ifdef HAVE_LIBZ
683  if (fp->compressedfp)
684  return gzeof(fp->compressedfp);
685  else
686 #endif
687  return feof(fp->uncompressedfp);
688 }
689 
690 const char *
692 {
693 #ifdef HAVE_LIBZ
694  if (fp->compressedfp)
695  {
696  int errnum;
697  const char *errmsg = gzerror(fp->compressedfp, &errnum);
698 
699  if (errnum != Z_ERRNO)
700  return errmsg;
701  }
702 #endif
703  return strerror(errno);
704 }
705 
706 #ifdef HAVE_LIBZ
707 static int
708 hasSuffix(const char *filename, const char *suffix)
709 {
710  int filenamelen = strlen(filename);
711  int suffixlen = strlen(suffix);
712 
713  if (filenamelen < suffixlen)
714  return 0;
715 
716  return memcmp(&filename[filenamelen - suffixlen],
717  suffix,
718  suffixlen) == 0;
719 }
720 
721 #endif
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:179
static PgChecksumMode mode
Definition: pg_checksums.c:61
#define Z_DEFAULT_COMPRESSION
CompressionAlgorithm
Definition: compress_io.h:24
CompressorState * AllocateCompressor(int compression, WriteFunc writeF)
Definition: compress_io.c:124
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int cfeof(cfp *fp)
Definition: compress_io.c:680
CompressionAlgorithm comprAlg
Definition: compress_io.c:67
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
static void free_keep_errno(void *p)
Definition: compress_io.c:437
FILE * uncompressedfp
Definition: compress_io.c:425
void(* WriteFunc)(ArchiveHandle *AH, const char *buf, size_t len)
Definition: compress_io.h:31
int cfread(void *ptr, int size, cfp *fp)
Definition: compress_io.c:572
WriteFunc writeF
Definition: compress_io.c:68
int cfclose(cfp *fp)
Definition: compress_io.c:653
static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen)
Definition: compress_io.c:406
void ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
Definition: compress_io.c:157
static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
Definition: compress_io.c:102
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
cfp * cfopen(const char *path, const char *mode, int compression)
Definition: compress_io.c:522
static char * buf
Definition: pg_test_fsync.c:68
char * cfgets(cfp *fp, char *buf, int len)
Definition: compress_io.c:642
size_t(* ReadFunc)(ArchiveHandle *AH, char **buf, size_t *buflen)
Definition: compress_io.h:44
z_stream * z_streamp
#define ZLIB_IN_SIZE
Definition: compress_io.h:22
#define unconstify(underlying_type, expr)
Definition: c.h:1163
static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
Definition: compress_io.c:388
#define free(a)
Definition: header.h:65
int cfwrite(const void *ptr, int size, cfp *fp)
Definition: compress_io.c:603
#define strerror
Definition: port.h:205
int cfgetc(cfp *fp)
Definition: compress_io.c:614
#define fatal(...)
#define ZLIB_OUT_SIZE
Definition: walmethods.c:32
cfp * cfopen_write(const char *path, const char *mode, int compression)
Definition: compress_io.c:493
static char * filename
Definition: pg_dumpall.c:91
int errmsg(const char *fmt,...)
Definition: elog.c:784
cfp * cfopen_read(const char *path, const char *mode)
Definition: compress_io.c:456
#define READ_ERROR_EXIT(fd)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
const char * get_cfp_error(cfp *fp)
Definition: compress_io.c:691
#define snprintf
Definition: port.h:192
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:202