PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
compress_io.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * compress_io.c
4  * Routines for archivers to write an uncompressed or compressed data
5  * stream.
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * This file includes two APIs for dealing with compressed data. The first
11  * provides more flexibility, using callbacks to read/write data from the
12  * underlying stream. The second API is a wrapper around fopen/gzopen and
13  * friends, providing an interface similar to those, but abstracts away
14  * the possible compression. Both APIs use libz for the compression, but
15  * the second API uses gzip headers, so the resulting files can be easily
16  * manipulated with the gzip utility.
17  *
18  * Compressor API
19  * --------------
20  *
21  * The interface for writing to an archive consists of three functions:
22  * AllocateCompressor, WriteDataToArchive and EndCompressor. First you call
23  * AllocateCompressor, then write all the data by calling WriteDataToArchive
24  * as many times as needed, and finally EndCompressor. WriteDataToArchive
25  * and EndCompressor will call the WriteFunc that was provided to
26  * AllocateCompressor for each chunk of compressed data.
27  *
28  * The interface for reading an archive consists of just one function:
29  * ReadDataFromArchive. ReadDataFromArchive reads the whole compressed input
30  * stream, by repeatedly calling the given ReadFunc. ReadFunc returns the
31  * compressed data chunk at a time, and ReadDataFromArchive decompresses it
32  * and passes the decompressed data to ahwrite(), until ReadFunc returns 0
33  * to signal EOF.
34  *
35  * The interface is the same for compressed and uncompressed streams.
36  *
37  * Compressed stream API
38  * ----------------------
39  *
40  * The compressed stream API is a wrapper around the C standard fopen() and
41  * libz's gzopen() APIs. It allows you to use the same functions for
42  * compressed and uncompressed streams. cfopen_read() first tries to open
43  * the file with given name, and if it fails, it tries to open the same
44  * file with the .gz suffix. cfopen_write() opens a file for writing, an
45  * extra argument specifies if the file should be compressed, and adds the
46  * .gz suffix to the filename if so. This allows you to easily handle both
47  * compressed and uncompressed files.
48  *
49  * IDENTIFICATION
50  * src/bin/pg_dump/compress_io.c
51  *
52  *-------------------------------------------------------------------------
53  */
54 #include "postgres_fe.h"
55 
56 #include "compress_io.h"
57 #include "pg_backup_utils.h"
58 
59 /*----------------------
60  * Compressor API
61  *----------------------
62  */
63 
64 /* typedef appears in compress_io.h */
66 {
69 
70 #ifdef HAVE_LIBZ
71  z_streamp zp;
72  char *zlibOut;
73  size_t zlibOutSize;
74 #endif
75 };
76 
77 /* translator: this is a module name */
78 static const char *modulename = gettext_noop("compress_io");
79 
80 static void ParseCompressionOption(int compression, CompressionAlgorithm *alg,
81  int *level);
82 
83 /* Routines that support zlib compressed data I/O */
84 #ifdef HAVE_LIBZ
85 static void InitCompressorZlib(CompressorState *cs, int level);
86 static void DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs,
87  bool flush);
88 static void ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF);
89 static void WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
90  const char *data, size_t dLen);
91 static void EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs);
92 #endif
93 
94 /* Routines that support uncompressed data I/O */
95 static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF);
97  const char *data, size_t dLen);
98 
99 /*
100  * Interprets a numeric 'compression' value. The algorithm implied by the
101  * value (zlib or none at the moment), is returned in *alg, and the
102  * zlib compression level in *level.
103  */
104 static void
105 ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
106 {
107  if (compression == Z_DEFAULT_COMPRESSION ||
108  (compression > 0 && compression <= 9))
109  *alg = COMPR_ALG_LIBZ;
110  else if (compression == 0)
111  *alg = COMPR_ALG_NONE;
112  else
113  {
114  exit_horribly(modulename, "invalid compression code: %d\n",
115  compression);
116  *alg = COMPR_ALG_NONE; /* keep compiler quiet */
117  }
118 
119  /* The level is just the passed-in value. */
120  if (level)
121  *level = compression;
122 }
123 
124 /* Public interface routines */
125 
126 /* Allocate a new compressor */
128 AllocateCompressor(int compression, WriteFunc writeF)
129 {
130  CompressorState *cs;
132  int level;
133 
134  ParseCompressionOption(compression, &alg, &level);
135 
136 #ifndef HAVE_LIBZ
137  if (alg == COMPR_ALG_LIBZ)
138  exit_horribly(modulename, "not built with zlib support\n");
139 #endif
140 
141  cs = (CompressorState *) pg_malloc0(sizeof(CompressorState));
142  cs->writeF = writeF;
143  cs->comprAlg = alg;
144 
145  /*
146  * Perform compression algorithm specific initialization.
147  */
148 #ifdef HAVE_LIBZ
149  if (alg == COMPR_ALG_LIBZ)
150  InitCompressorZlib(cs, level);
151 #endif
152 
153  return cs;
154 }
155 
156 /*
157  * Read all compressed data from the input stream (via readF) and print it
158  * out with ahwrite().
159  */
160 void
161 ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
162 {
164 
165  ParseCompressionOption(compression, &alg, NULL);
166 
167  if (alg == COMPR_ALG_NONE)
168  ReadDataFromArchiveNone(AH, readF);
169  if (alg == COMPR_ALG_LIBZ)
170  {
171 #ifdef HAVE_LIBZ
172  ReadDataFromArchiveZlib(AH, readF);
173 #else
174  exit_horribly(modulename, "not built with zlib support\n");
175 #endif
176  }
177 }
178 
179 /*
180  * Compress and write data to the output stream (via writeF).
181  */
182 void
184  const void *data, size_t dLen)
185 {
186  switch (cs->comprAlg)
187  {
188  case COMPR_ALG_LIBZ:
189 #ifdef HAVE_LIBZ
190  WriteDataToArchiveZlib(AH, cs, data, dLen);
191 #else
192  exit_horribly(modulename, "not built with zlib support\n");
193 #endif
194  break;
195  case COMPR_ALG_NONE:
196  WriteDataToArchiveNone(AH, cs, data, dLen);
197  break;
198  }
199  return;
200 }
201 
202 /*
203  * Terminate compression library context and flush its buffers.
204  */
205 void
207 {
208 #ifdef HAVE_LIBZ
209  if (cs->comprAlg == COMPR_ALG_LIBZ)
210  EndCompressorZlib(AH, cs);
211 #endif
212  free(cs);
213 }
214 
215 /* Private routines, specific to each compression method. */
216 
217 #ifdef HAVE_LIBZ
218 /*
219  * Functions for zlib compressed output.
220  */
221 
222 static void
223 InitCompressorZlib(CompressorState *cs, int level)
224 {
225  z_streamp zp;
226 
227  zp = cs->zp = (z_streamp) pg_malloc(sizeof(z_stream));
228  zp->zalloc = Z_NULL;
229  zp->zfree = Z_NULL;
230  zp->opaque = Z_NULL;
231 
232  /*
233  * zlibOutSize is the buffer size we tell zlib it can output to. We
234  * actually allocate one extra byte because some routines want to append a
235  * trailing zero byte to the zlib output.
236  */
237  cs->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
238  cs->zlibOutSize = ZLIB_OUT_SIZE;
239 
240  if (deflateInit(zp, level) != Z_OK)
242  "could not initialize compression library: %s\n",
243  zp->msg);
244 
245  /* Just be paranoid - maybe End is called after Start, with no Write */
246  zp->next_out = (void *) cs->zlibOut;
247  zp->avail_out = cs->zlibOutSize;
248 }
249 
250 static void
251 EndCompressorZlib(ArchiveHandle *AH, CompressorState *cs)
252 {
253  z_streamp zp = cs->zp;
254 
255  zp->next_in = NULL;
256  zp->avail_in = 0;
257 
258  /* Flush any remaining data from zlib buffer */
259  DeflateCompressorZlib(AH, cs, true);
260 
261  if (deflateEnd(zp) != Z_OK)
263  "could not close compression stream: %s\n", zp->msg);
264 
265  free(cs->zlibOut);
266  free(cs->zp);
267 }
268 
269 static void
270 DeflateCompressorZlib(ArchiveHandle *AH, CompressorState *cs, bool flush)
271 {
272  z_streamp zp = cs->zp;
273  char *out = cs->zlibOut;
274  int res = Z_OK;
275 
276  while (cs->zp->avail_in != 0 || flush)
277  {
278  res = deflate(zp, flush ? Z_FINISH : Z_NO_FLUSH);
279  if (res == Z_STREAM_ERROR)
281  "could not compress data: %s\n", zp->msg);
282  if ((flush && (zp->avail_out < cs->zlibOutSize))
283  || (zp->avail_out == 0)
284  || (zp->avail_in != 0)
285  )
286  {
287  /*
288  * Extra paranoia: avoid zero-length chunks, since a zero length
289  * chunk is the EOF marker in the custom format. This should never
290  * happen but...
291  */
292  if (zp->avail_out < cs->zlibOutSize)
293  {
294  /*
295  * Any write function should do its own error checking but to
296  * make sure we do a check here as well...
297  */
298  size_t len = cs->zlibOutSize - zp->avail_out;
299 
300  cs->writeF(AH, out, len);
301  }
302  zp->next_out = (void *) out;
303  zp->avail_out = cs->zlibOutSize;
304  }
305 
306  if (res == Z_STREAM_END)
307  break;
308  }
309 }
310 
311 static void
312 WriteDataToArchiveZlib(ArchiveHandle *AH, CompressorState *cs,
313  const char *data, size_t dLen)
314 {
315  cs->zp->next_in = (void *) data;
316  cs->zp->avail_in = dLen;
317  DeflateCompressorZlib(AH, cs, false);
318 
319  return;
320 }
321 
322 static void
323 ReadDataFromArchiveZlib(ArchiveHandle *AH, ReadFunc readF)
324 {
325  z_streamp zp;
326  char *out;
327  int res = Z_OK;
328  size_t cnt;
329  char *buf;
330  size_t buflen;
331 
332  zp = (z_streamp) pg_malloc(sizeof(z_stream));
333  zp->zalloc = Z_NULL;
334  zp->zfree = Z_NULL;
335  zp->opaque = Z_NULL;
336 
337  buf = pg_malloc(ZLIB_IN_SIZE);
338  buflen = ZLIB_IN_SIZE;
339 
340  out = pg_malloc(ZLIB_OUT_SIZE + 1);
341 
342  if (inflateInit(zp) != Z_OK)
344  "could not initialize compression library: %s\n",
345  zp->msg);
346 
347  /* no minimal chunk size for zlib */
348  while ((cnt = readF(AH, &buf, &buflen)))
349  {
350  zp->next_in = (void *) buf;
351  zp->avail_in = cnt;
352 
353  while (zp->avail_in > 0)
354  {
355  zp->next_out = (void *) out;
356  zp->avail_out = ZLIB_OUT_SIZE;
357 
358  res = inflate(zp, 0);
359  if (res != Z_OK && res != Z_STREAM_END)
361  "could not uncompress data: %s\n", zp->msg);
362 
363  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
364  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
365  }
366  }
367 
368  zp->next_in = NULL;
369  zp->avail_in = 0;
370  while (res != Z_STREAM_END)
371  {
372  zp->next_out = (void *) out;
373  zp->avail_out = ZLIB_OUT_SIZE;
374  res = inflate(zp, 0);
375  if (res != Z_OK && res != Z_STREAM_END)
377  "could not uncompress data: %s\n", zp->msg);
378 
379  out[ZLIB_OUT_SIZE - zp->avail_out] = '\0';
380  ahwrite(out, 1, ZLIB_OUT_SIZE - zp->avail_out, AH);
381  }
382 
383  if (inflateEnd(zp) != Z_OK)
385  "could not close compression library: %s\n", zp->msg);
386 
387  free(buf);
388  free(out);
389  free(zp);
390 }
391 #endif /* HAVE_LIBZ */
392 
393 
394 /*
395  * Functions for uncompressed output.
396  */
397 
398 static void
400 {
401  size_t cnt;
402  char *buf;
403  size_t buflen;
404 
405  buf = pg_malloc(ZLIB_OUT_SIZE);
406  buflen = ZLIB_OUT_SIZE;
407 
408  while ((cnt = readF(AH, &buf, &buflen)))
409  {
410  ahwrite(buf, 1, cnt, AH);
411  }
412 
413  free(buf);
414 }
415 
416 static void
418  const char *data, size_t dLen)
419 {
420  cs->writeF(AH, data, dLen);
421  return;
422 }
423 
424 
425 /*----------------------
426  * Compressed stream API
427  *----------------------
428  */
429 
430 /*
431  * cfp represents an open stream, wrapping the underlying FILE or gzFile
432  * pointer. This is opaque to the callers.
433  */
434 struct cfp
435 {
437 #ifdef HAVE_LIBZ
438  gzFile compressedfp;
439 #endif
440 };
441 
442 #ifdef HAVE_LIBZ
443 static int hasSuffix(const char *filename, const char *suffix);
444 #endif
445 
446 /* free() without changing errno; useful in several places below */
447 static void
449 {
450  int save_errno = errno;
451 
452  free(p);
453  errno = save_errno;
454 }
455 
456 /*
457  * Open a file for reading. 'path' is the file to open, and 'mode' should
458  * be either "r" or "rb".
459  *
460  * If the file at 'path' does not exist, we append the ".gz" suffix (if 'path'
461  * doesn't already have it) and try again. So if you pass "foo" as 'path',
462  * this will open either "foo" or "foo.gz".
463  *
464  * On failure, return NULL with an error code in errno.
465  */
466 cfp *
467 cfopen_read(const char *path, const char *mode)
468 {
469  cfp *fp;
470 
471 #ifdef HAVE_LIBZ
472  if (hasSuffix(path, ".gz"))
473  fp = cfopen(path, mode, 1);
474  else
475 #endif
476  {
477  fp = cfopen(path, mode, 0);
478 #ifdef HAVE_LIBZ
479  if (fp == NULL)
480  {
481  char *fname;
482 
483  fname = psprintf("%s.gz", path);
484  fp = cfopen(fname, mode, 1);
485  free_keep_errno(fname);
486  }
487 #endif
488  }
489  return fp;
490 }
491 
492 /*
493  * Open a file for writing. 'path' indicates the path name, and 'mode' must
494  * be a filemode as accepted by fopen() and gzopen() that indicates writing
495  * ("w", "wb", "a", or "ab").
496  *
497  * If 'compression' is non-zero, a gzip compressed stream is opened, and
498  * 'compression' indicates the compression level used. The ".gz" suffix
499  * is automatically added to 'path' in that case.
500  *
501  * On failure, return NULL with an error code in errno.
502  */
503 cfp *
504 cfopen_write(const char *path, const char *mode, int compression)
505 {
506  cfp *fp;
507 
508  if (compression == 0)
509  fp = cfopen(path, mode, 0);
510  else
511  {
512 #ifdef HAVE_LIBZ
513  char *fname;
514 
515  fname = psprintf("%s.gz", path);
516  fp = cfopen(fname, mode, compression);
517  free_keep_errno(fname);
518 #else
519  exit_horribly(modulename, "not built with zlib support\n");
520  fp = NULL; /* keep compiler quiet */
521 #endif
522  }
523  return fp;
524 }
525 
526 /*
527  * Opens file 'path' in 'mode'. If 'compression' is non-zero, the file
528  * is opened with libz gzopen(), otherwise with plain fopen().
529  *
530  * On failure, return NULL with an error code in errno.
531  */
532 cfp *
533 cfopen(const char *path, const char *mode, int compression)
534 {
535  cfp *fp = pg_malloc(sizeof(cfp));
536 
537  if (compression != 0)
538  {
539 #ifdef HAVE_LIBZ
540  if (compression != Z_DEFAULT_COMPRESSION)
541  {
542  /* user has specified a compression level, so tell zlib to use it */
543  char mode_compression[32];
544 
545  snprintf(mode_compression, sizeof(mode_compression), "%s%d",
546  mode, compression);
547  fp->compressedfp = gzopen(path, mode_compression);
548  }
549  else
550  {
551  /* don't specify a level, just use the zlib default */
552  fp->compressedfp = gzopen(path, mode);
553  }
554 
555  fp->uncompressedfp = NULL;
556  if (fp->compressedfp == NULL)
557  {
558  free_keep_errno(fp);
559  fp = NULL;
560  }
561 #else
562  exit_horribly(modulename, "not built with zlib support\n");
563 #endif
564  }
565  else
566  {
567 #ifdef HAVE_LIBZ
568  fp->compressedfp = NULL;
569 #endif
570  fp->uncompressedfp = fopen(path, mode);
571  if (fp->uncompressedfp == NULL)
572  {
573  free_keep_errno(fp);
574  fp = NULL;
575  }
576  }
577 
578  return fp;
579 }
580 
581 
582 int
583 cfread(void *ptr, int size, cfp *fp)
584 {
585  int ret;
586 
587  if (size == 0)
588  return 0;
589 
590 #ifdef HAVE_LIBZ
591  if (fp->compressedfp)
592  {
593  ret = gzread(fp->compressedfp, ptr, size);
594  if (ret != size && !gzeof(fp->compressedfp))
596  "could not read from input file: %s\n", strerror(errno));
597  }
598  else
599 #endif
600  {
601  ret = fread(ptr, 1, size, fp->uncompressedfp);
602  if (ret != size && !feof(fp->uncompressedfp))
604  }
605  return ret;
606 }
607 
608 int
609 cfwrite(const void *ptr, int size, cfp *fp)
610 {
611 #ifdef HAVE_LIBZ
612  if (fp->compressedfp)
613  return gzwrite(fp->compressedfp, ptr, size);
614  else
615 #endif
616  return fwrite(ptr, 1, size, fp->uncompressedfp);
617 }
618 
619 int
621 {
622  int ret;
623 
624 #ifdef HAVE_LIBZ
625  if (fp->compressedfp)
626  {
627  ret = gzgetc(fp->compressedfp);
628  if (ret == EOF)
629  {
630  if (!gzeof(fp->compressedfp))
632  "could not read from input file: %s\n", strerror(errno));
633  else
635  "could not read from input file: end of file\n");
636  }
637  }
638  else
639 #endif
640  {
641  ret = fgetc(fp->uncompressedfp);
642  if (ret == EOF)
644  }
645 
646  return ret;
647 }
648 
649 char *
650 cfgets(cfp *fp, char *buf, int len)
651 {
652 #ifdef HAVE_LIBZ
653  if (fp->compressedfp)
654  return gzgets(fp->compressedfp, buf, len);
655  else
656 #endif
657  return fgets(buf, len, fp->uncompressedfp);
658 }
659 
660 int
662 {
663  int result;
664 
665  if (fp == NULL)
666  {
667  errno = EBADF;
668  return EOF;
669  }
670 #ifdef HAVE_LIBZ
671  if (fp->compressedfp)
672  {
673  result = gzclose(fp->compressedfp);
674  fp->compressedfp = NULL;
675  }
676  else
677 #endif
678  {
679  result = fclose(fp->uncompressedfp);
680  fp->uncompressedfp = NULL;
681  }
682  free_keep_errno(fp);
683 
684  return result;
685 }
686 
687 int
688 cfeof(cfp *fp)
689 {
690 #ifdef HAVE_LIBZ
691  if (fp->compressedfp)
692  return gzeof(fp->compressedfp);
693  else
694 #endif
695  return feof(fp->uncompressedfp);
696 }
697 
698 #ifdef HAVE_LIBZ
699 static int
700 hasSuffix(const char *filename, const char *suffix)
701 {
702  int filenamelen = strlen(filename);
703  int suffixlen = strlen(suffix);
704 
705  if (filenamelen < suffixlen)
706  return 0;
707 
708  return memcmp(&filename[filenamelen - suffixlen],
709  suffix,
710  suffixlen) == 0;
711 }
712 
713 #endif
void WriteDataToArchive(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.c:183
#define Z_DEFAULT_COMPRESSION
static const char * modulename
Definition: compress_io.c:78
CompressionAlgorithm
Definition: compress_io.h:24
CompressorState * AllocateCompressor(int compression, WriteFunc writeF)
Definition: compress_io.c:128
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int cfeof(cfp *fp)
Definition: compress_io.c:688
CompressionAlgorithm comprAlg
Definition: compress_io.c:67
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
static void free_keep_errno(void *p)
Definition: compress_io.c:448
#define gettext_noop(x)
Definition: c.h:139
FILE * uncompressedfp
Definition: compress_io.c:436
int cfread(void *ptr, int size, cfp *fp)
Definition: compress_io.c:583
WriteFunc writeF
Definition: compress_io.c:68
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
int cfclose(cfp *fp)
Definition: compress_io.c:661
static void WriteDataToArchiveNone(ArchiveHandle *AH, CompressorState *cs, const char *data, size_t dLen)
Definition: compress_io.c:417
void ReadDataFromArchive(ArchiveHandle *AH, int compression, ReadFunc readF)
Definition: compress_io.c:161
void(* WriteFunc)(ArchiveHandle *AH, const char *buf, size_t len)
Definition: compress_io.h:31
static void ParseCompressionOption(int compression, CompressionAlgorithm *alg, int *level)
Definition: compress_io.c:105
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
cfp * cfopen(const char *path, const char *mode, int compression)
Definition: compress_io.c:533
static char * buf
Definition: pg_test_fsync.c:65
char * cfgets(cfp *fp, char *buf, int len)
Definition: compress_io.c:650
z_stream * z_streamp
#define ZLIB_IN_SIZE
Definition: compress_io.h:22
static void ReadDataFromArchiveNone(ArchiveHandle *AH, ReadFunc readF)
Definition: compress_io.c:399
#define free(a)
Definition: header.h:60
#define NULL
Definition: c.h:226
int cfwrite(const void *ptr, int size, cfp *fp)
Definition: compress_io.c:609
size_t(* ReadFunc)(ArchiveHandle *AH, char **buf, size_t *buflen)
Definition: compress_io.h:44
int cfgetc(cfp *fp)
Definition: compress_io.c:620
#define ZLIB_OUT_SIZE
Definition: walmethods.c:31
void exit_horribly(const char *modulename, const char *fmt,...)
cfp * cfopen_write(const char *path, const char *mode, int compression)
Definition: compress_io.c:504
static char * filename
Definition: pg_dumpall.c:84
cfp * cfopen_read(const char *path, const char *mode)
Definition: compress_io.c:467
const char * strerror(int errnum)
Definition: strerror.c:19
#define READ_ERROR_EXIT(fd)
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
void EndCompressor(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.c:206