PostgreSQL Source Code  git master
compress_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * compress_zstd.c
4  * Routines for archivers to write a Zstd compressed data stream.
5  *
6  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  * src/bin/pg_dump/compress_zstd.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include "compress_zstd.h"
18 #include "pg_backup_utils.h"
19 
20 #ifndef USE_ZSTD
21 
22 void
24 {
25  pg_fatal("this build does not support compression with %s", "ZSTD");
26 }
27 
28 void
30 {
31  pg_fatal("this build does not support compression with %s", "ZSTD");
32 }
33 
34 #else
35 
36 #include <zstd.h>
37 
38 typedef struct ZstdCompressorState
39 {
40  /* This is a normal file to which we read/write compressed data */
41  FILE *fp;
42 
43  ZSTD_CStream *cstream;
44  ZSTD_DStream *dstream;
45  ZSTD_outBuffer output;
46  ZSTD_inBuffer input;
47 
48  /* pointer to a static string like from strerror(), for Zstd_write() */
49  const char *zstderror;
50 } ZstdCompressorState;
51 
52 static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
53 static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
54 static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
55  const void *data, size_t dLen);
56 static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
57 
58 static void
59 _Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
60  ZSTD_cParameter param, int value, char *paramname)
61 {
62  size_t res;
63 
64  res = ZSTD_CCtx_setParameter(cstream, param, value);
65  if (ZSTD_isError(res))
66  pg_fatal("could not set compression parameter \"%s\": %s",
67  paramname, ZSTD_getErrorName(res));
68 }
69 
70 /* Return a compression stream with parameters set per argument */
71 static ZSTD_CStream *
72 _ZstdCStreamParams(pg_compress_specification compress)
73 {
74  ZSTD_CStream *cstream;
75 
76  cstream = ZSTD_createCStream();
77  if (cstream == NULL)
78  pg_fatal("could not initialize compression library");
79 
80  _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
81  compress.level, "level");
82 
84  _Zstd_CCtx_setParam_or_die(cstream,
85  ZSTD_c_enableLongDistanceMatching,
86  compress.long_distance, "long");
87 
88  return cstream;
89 }
90 
91 /* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
92 static void
93 _ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
94 {
95  ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
96  ZSTD_inBuffer *input = &zstdcs->input;
97  ZSTD_outBuffer *output = &zstdcs->output;
98 
99  /* Loop while there's any input or until flushed */
100  while (input->pos != input->size || flush)
101  {
102  size_t res;
103 
104  output->pos = 0;
105  res = ZSTD_compressStream2(zstdcs->cstream, output,
106  input, flush ? ZSTD_e_end : ZSTD_e_continue);
107 
108  if (ZSTD_isError(res))
109  pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
110 
111  /*
112  * Extra paranoia: avoid zero-length chunks, since a zero length chunk
113  * is the EOF marker in the custom format. This should never happen
114  * but...
115  */
116  if (output->pos > 0)
117  cs->writeF(AH, output->dst, output->pos);
118 
119  if (res == 0)
120  break; /* End of frame or all input consumed */
121  }
122 }
123 
124 static void
125 EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
126 {
127  ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
128 
129  if (cs->readF != NULL)
130  {
131  Assert(zstdcs->cstream == NULL);
132  ZSTD_freeDStream(zstdcs->dstream);
133  pg_free(unconstify(void *, zstdcs->input.src));
134  }
135  else if (cs->writeF != NULL)
136  {
137  Assert(zstdcs->dstream == NULL);
138  _ZstdWriteCommon(AH, cs, true);
139  ZSTD_freeCStream(zstdcs->cstream);
140  pg_free(zstdcs->output.dst);
141  }
142 
143  pg_free(zstdcs);
144 }
145 
146 static void
147 WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
148  const void *data, size_t dLen)
149 {
150  ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
151 
152  zstdcs->input.src = data;
153  zstdcs->input.size = dLen;
154  zstdcs->input.pos = 0;
155 
156  _ZstdWriteCommon(AH, cs, false);
157 }
158 
159 static void
160 ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
161 {
162  ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
163  ZSTD_outBuffer *output = &zstdcs->output;
164  ZSTD_inBuffer *input = &zstdcs->input;
165  size_t input_allocated_size = ZSTD_DStreamInSize();
166  size_t res;
167 
168  for (;;)
169  {
170  size_t cnt;
171 
172  /*
173  * Read compressed data. Note that readF can resize the buffer; the
174  * new size is tracked and used for future loops.
175  */
176  input->size = input_allocated_size;
177  cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
178 
179  /* ensure that readF didn't *shrink* the buffer */
180  Assert(input->size >= input_allocated_size);
181  input_allocated_size = input->size;
182  input->size = cnt;
183  input->pos = 0;
184 
185  if (cnt == 0)
186  break;
187 
188  /* Now decompress */
189  while (input->pos < input->size)
190  {
191  output->pos = 0;
192  res = ZSTD_decompressStream(zstdcs->dstream, output, input);
193  if (ZSTD_isError(res))
194  pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
195 
196  /*
197  * then write the decompressed data to the output handle
198  */
199  ((char *) output->dst)[output->pos] = '\0';
200  ahwrite(output->dst, 1, output->pos, AH);
201 
202  if (res == 0)
203  break; /* End of frame */
204  }
205  }
206 }
207 
208 /* Public routine that supports Zstd compressed data I/O */
209 void
211  const pg_compress_specification compression_spec)
212 {
213  ZstdCompressorState *zstdcs;
214 
215  cs->readData = ReadDataFromArchiveZstd;
216  cs->writeData = WriteDataToArchiveZstd;
217  cs->end = EndCompressorZstd;
218 
219  cs->compression_spec = compression_spec;
220 
221  zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
222  cs->private_data = zstdcs;
223 
224  /* We expect that exactly one of readF/writeF is specified */
225  Assert((cs->readF == NULL) != (cs->writeF == NULL));
226 
227  if (cs->readF != NULL)
228  {
229  zstdcs->dstream = ZSTD_createDStream();
230  if (zstdcs->dstream == NULL)
231  pg_fatal("could not initialize compression library");
232 
233  zstdcs->input.size = ZSTD_DStreamInSize();
234  zstdcs->input.src = pg_malloc(zstdcs->input.size);
235 
236  /*
237  * output.size is the buffer size we tell zstd it can output to.
238  * Allocate an additional byte such that ReadDataFromArchiveZstd() can
239  * call ahwrite() with a null-terminated string, which is an optimized
240  * case in ExecuteSqlCommandBuf().
241  */
242  zstdcs->output.size = ZSTD_DStreamOutSize();
243  zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
244  }
245  else if (cs->writeF != NULL)
246  {
247  zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
248 
249  zstdcs->output.size = ZSTD_CStreamOutSize();
250  zstdcs->output.dst = pg_malloc(zstdcs->output.size);
251  zstdcs->output.pos = 0;
252  }
253 }
254 
255 /*
256  * Compressed stream API
257  */
258 
259 static bool
260 Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
261 {
262  ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
263  ZSTD_inBuffer *input = &zstdcs->input;
264  ZSTD_outBuffer *output = &zstdcs->output;
265  size_t input_allocated_size = ZSTD_DStreamInSize();
266  size_t res,
267  cnt;
268 
269  output->size = size;
270  output->dst = ptr;
271  output->pos = 0;
272 
273  for (;;)
274  {
275  Assert(input->pos <= input->size);
276  Assert(input->size <= input_allocated_size);
277 
278  /*
279  * If the input is completely consumed, start back at the beginning
280  */
281  if (input->pos == input->size)
282  {
283  /* input->size is size produced by "fread" */
284  input->size = 0;
285  /* input->pos is position consumed by decompress */
286  input->pos = 0;
287  }
288 
289  /* read compressed data if we must produce more input */
290  if (input->pos == input->size)
291  {
292  cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
293  input->size = cnt;
294 
295  Assert(cnt <= input_allocated_size);
296 
297  /* If we have no more input to consume, we're done */
298  if (cnt == 0)
299  break;
300  }
301 
302  while (input->pos < input->size)
303  {
304  /* now decompress */
305  res = ZSTD_decompressStream(zstdcs->dstream, output, input);
306 
307  if (ZSTD_isError(res))
308  pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
309 
310  if (output->pos == output->size)
311  break; /* No more room for output */
312 
313  if (res == 0)
314  break; /* End of frame */
315  }
316 
317  if (output->pos == output->size)
318  break; /* We read all the data that fits */
319  }
320 
321  if (rdsize != NULL)
322  *rdsize = output->pos;
323 
324  return true;
325 }
326 
327 static bool
328 Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
329 {
330  ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
331  ZSTD_inBuffer *input = &zstdcs->input;
332  ZSTD_outBuffer *output = &zstdcs->output;
333  size_t res,
334  cnt;
335 
336  input->src = ptr;
337  input->size = size;
338  input->pos = 0;
339 
340  /* Consume all input, to be flushed later */
341  while (input->pos != input->size)
342  {
343  output->pos = 0;
344  res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
345  if (ZSTD_isError(res))
346  {
347  zstdcs->zstderror = ZSTD_getErrorName(res);
348  return false;
349  }
350 
351  cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
352  if (cnt != output->pos)
353  {
354  zstdcs->zstderror = strerror(errno);
355  return false;
356  }
357  }
358 
359  return size;
360 }
361 
362 static int
363 Zstd_getc(CompressFileHandle *CFH)
364 {
365  ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
366  int ret;
367 
368  if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
369  {
370  if (feof(zstdcs->fp))
371  pg_fatal("could not read from input file: end of file");
372  else
373  pg_fatal("could not read from input file: %m");
374  }
375  return ret;
376 }
377 
378 static char *
379 Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
380 {
381  int i;
382 
383  Assert(len > 0);
384 
385  /*
386  * Read one byte at a time until newline or EOF. This is only used to read
387  * the list of LOs, and the I/O is buffered anyway.
388  */
389  for (i = 0; i < len - 1; ++i)
390  {
391  size_t readsz;
392 
393  if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
394  break;
395  if (readsz != 1)
396  break;
397  if (buf[i] == '\n')
398  {
399  ++i;
400  break;
401  }
402  }
403  buf[i] = '\0';
404  return i > 0 ? buf : NULL;
405 }
406 
407 static bool
408 Zstd_close(CompressFileHandle *CFH)
409 {
410  ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
411 
412  if (zstdcs->cstream)
413  {
414  size_t res,
415  cnt;
416  ZSTD_inBuffer *input = &zstdcs->input;
417  ZSTD_outBuffer *output = &zstdcs->output;
418 
419  /* Loop until the compression buffers are fully consumed */
420  for (;;)
421  {
422  output->pos = 0;
423  res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
424  if (ZSTD_isError(res))
425  {
426  zstdcs->zstderror = ZSTD_getErrorName(res);
427  return false;
428  }
429 
430  cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
431  if (cnt != output->pos)
432  {
433  zstdcs->zstderror = strerror(errno);
434  return false;
435  }
436 
437  if (res == 0)
438  break; /* End of frame */
439  }
440 
441  ZSTD_freeCStream(zstdcs->cstream);
442  pg_free(zstdcs->output.dst);
443  }
444 
445  if (zstdcs->dstream)
446  {
447  ZSTD_freeDStream(zstdcs->dstream);
448  pg_free(unconstify(void *, zstdcs->input.src));
449  }
450 
451  if (fclose(zstdcs->fp) != 0)
452  return false;
453 
454  pg_free(zstdcs);
455  return true;
456 }
457 
458 static bool
459 Zstd_eof(CompressFileHandle *CFH)
460 {
461  ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
462 
463  return feof(zstdcs->fp);
464 }
465 
466 static bool
467 Zstd_open(const char *path, int fd, const char *mode,
468  CompressFileHandle *CFH)
469 {
470  FILE *fp;
471  ZstdCompressorState *zstdcs;
472 
473  if (fd >= 0)
474  fp = fdopen(fd, mode);
475  else
476  fp = fopen(path, mode);
477 
478  if (fp == NULL)
479  return false;
480 
481  zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
482  CFH->private_data = zstdcs;
483  zstdcs->fp = fp;
484 
485  if (mode[0] == 'r')
486  {
487  zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
488  zstdcs->dstream = ZSTD_createDStream();
489  if (zstdcs->dstream == NULL)
490  pg_fatal("could not initialize compression library");
491  }
492  else if (mode[0] == 'w' || mode[0] == 'a')
493  {
494  zstdcs->output.size = ZSTD_CStreamOutSize();
495  zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
496  zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
497  if (zstdcs->cstream == NULL)
498  pg_fatal("could not initialize compression library");
499  }
500  else
501  pg_fatal("unhandled mode \"%s\"", mode);
502 
503  return true;
504 }
505 
506 static bool
507 Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
508 {
509  char fname[MAXPGPATH];
510 
511  sprintf(fname, "%s.zst", path);
512  return CFH->open_func(fname, -1, mode, CFH);
513 }
514 
515 static const char *
516 Zstd_get_error(CompressFileHandle *CFH)
517 {
518  ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
519 
520  return zstdcs->zstderror;
521 }
522 
523 void
525  const pg_compress_specification compression_spec)
526 {
527  CFH->open_func = Zstd_open;
528  CFH->open_write_func = Zstd_open_write;
529  CFH->read_func = Zstd_read;
530  CFH->write_func = Zstd_write;
531  CFH->gets_func = Zstd_gets;
532  CFH->getc_func = Zstd_getc;
533  CFH->close_func = Zstd_close;
534  CFH->eof_func = Zstd_eof;
535  CFH->get_error_func = Zstd_get_error;
536 
537  CFH->compression_spec = compression_spec;
538 
539  CFH->private_data = NULL;
540 }
541 
542 #endif /* USE_ZSTD */
#define unconstify(underlying_type, expr)
Definition: c.h:1236
#define Assert(condition)
Definition: c.h:849
void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
Definition: compress_zstd.c:23
void InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
Definition: compress_zstd.c:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
FILE * input
FILE * output
static struct @157 value
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
#define pg_fatal(...)
static PgChecksumMode mode
Definition: pg_checksums.c:56
#define MAXPGPATH
const void size_t len
const void * data
while(p+4<=pend)
static char * buf
Definition: pg_test_fsync.c:73
#define sprintf
Definition: port.h:240
#define strerror
Definition: port.h:251
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static pg_noinline void Size size
Definition: slab.c:607
bool(* open_write_func)(const char *path, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:122
const char *(* get_error_func)(CompressFileHandle *CFH)
Definition: compress_io.h:181
bool(* write_func)(const void *ptr, size_t size, struct CompressFileHandle *CFH)
Definition: compress_io.h:139
int(* getc_func)(CompressFileHandle *CFH)
Definition: compress_io.h:161
char *(* gets_func)(char *s, int size, CompressFileHandle *CFH)
Definition: compress_io.h:152
bool(* eof_func)(CompressFileHandle *CFH)
Definition: compress_io.h:168
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:111
pg_compress_specification compression_spec
Definition: compress_io.h:186
bool(* close_func)(CompressFileHandle *CFH)
Definition: compress_io.h:175
bool(* read_func)(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
Definition: compress_io.h:131
void * private_data
Definition: compress_io.h:87
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:56
pg_compress_specification compression_spec
Definition: compress_io.h:82
void(* end)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:67
ReadFunc readF
Definition: compress_io.h:72
void(* writeData)(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.h:61
WriteFunc writeF
Definition: compress_io.h:77