PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
compress_zstd.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * compress_zstd.c
4 * Routines for archivers to write a Zstd compressed data stream.
5 *
6 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/bin/pg_dump/compress_zstd.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#include "postgres_fe.h"
16
17#include "compress_zstd.h"
18#include "pg_backup_utils.h"
19
20#ifndef USE_ZSTD
21
22void
24{
25 pg_fatal("this build does not support compression with %s", "ZSTD");
26}
27
28void
30{
31 pg_fatal("this build does not support compression with %s", "ZSTD");
32}
33
34#else
35
36#include <zstd.h>
37
38typedef struct ZstdCompressorState
39{
40 /* This is a normal file to which we read/write compressed data */
41 FILE *fp;
42
43 ZSTD_CStream *cstream;
44 ZSTD_DStream *dstream;
45 ZSTD_outBuffer output;
46 ZSTD_inBuffer input;
47
48 /* pointer to a static string like from strerror(), for Zstd_write() */
49 const char *zstderror;
50} ZstdCompressorState;
51
52static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
53static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
54static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
55 const void *data, size_t dLen);
56static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
57
58static void
59_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
60 ZSTD_cParameter param, int value, char *paramname)
61{
62 size_t res;
63
64 res = ZSTD_CCtx_setParameter(cstream, param, value);
65 if (ZSTD_isError(res))
66 pg_fatal("could not set compression parameter \"%s\": %s",
67 paramname, ZSTD_getErrorName(res));
68}
69
70/* Return a compression stream with parameters set per argument */
71static ZSTD_CStream *
72_ZstdCStreamParams(pg_compress_specification compress)
73{
74 ZSTD_CStream *cstream;
75
76 cstream = ZSTD_createCStream();
77 if (cstream == NULL)
78 pg_fatal("could not initialize compression library");
79
80 _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
81 compress.level, "level");
82
84 _Zstd_CCtx_setParam_or_die(cstream,
85 ZSTD_c_enableLongDistanceMatching,
86 compress.long_distance, "long");
87
88 return cstream;
89}
90
91/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
92static void
93_ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
94{
95 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
96 ZSTD_inBuffer *input = &zstdcs->input;
97 ZSTD_outBuffer *output = &zstdcs->output;
98
99 /* Loop while there's any input or until flushed */
100 while (input->pos != input->size || flush)
101 {
102 size_t res;
103
104 output->pos = 0;
105 res = ZSTD_compressStream2(zstdcs->cstream, output,
106 input, flush ? ZSTD_e_end : ZSTD_e_continue);
107
108 if (ZSTD_isError(res))
109 pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
110
111 /*
112 * Extra paranoia: avoid zero-length chunks, since a zero length chunk
113 * is the EOF marker in the custom format. This should never happen
114 * but...
115 */
116 if (output->pos > 0)
117 cs->writeF(AH, output->dst, output->pos);
118
119 if (res == 0)
120 break; /* End of frame or all input consumed */
121 }
122}
123
124static void
125EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
126{
127 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
128
129 if (cs->readF != NULL)
130 {
131 Assert(zstdcs->cstream == NULL);
132 ZSTD_freeDStream(zstdcs->dstream);
133 pg_free(unconstify(void *, zstdcs->input.src));
134 }
135 else if (cs->writeF != NULL)
136 {
137 Assert(zstdcs->dstream == NULL);
138 _ZstdWriteCommon(AH, cs, true);
139 ZSTD_freeCStream(zstdcs->cstream);
140 }
141
142 /* output buffer may be allocated in either mode */
143 pg_free(zstdcs->output.dst);
144 pg_free(zstdcs);
145}
146
147static void
148WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
149 const void *data, size_t dLen)
150{
151 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
152
153 zstdcs->input.src = data;
154 zstdcs->input.size = dLen;
155 zstdcs->input.pos = 0;
156
157 _ZstdWriteCommon(AH, cs, false);
158}
159
160static void
161ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
162{
163 ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
164 ZSTD_outBuffer *output = &zstdcs->output;
165 ZSTD_inBuffer *input = &zstdcs->input;
166 size_t input_allocated_size = ZSTD_DStreamInSize();
167 size_t res;
168
169 for (;;)
170 {
171 size_t cnt;
172
173 /*
174 * Read compressed data. Note that readF can resize the buffer; the
175 * new size is tracked and used for future loops.
176 */
177 input->size = input_allocated_size;
178 cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
179
180 /* ensure that readF didn't *shrink* the buffer */
181 Assert(input->size >= input_allocated_size);
182 input_allocated_size = input->size;
183 input->size = cnt;
184 input->pos = 0;
185
186 if (cnt == 0)
187 break;
188
189 /* Now decompress */
190 while (input->pos < input->size)
191 {
192 output->pos = 0;
193 res = ZSTD_decompressStream(zstdcs->dstream, output, input);
194 if (ZSTD_isError(res))
195 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
196
197 /*
198 * then write the decompressed data to the output handle
199 */
200 ((char *) output->dst)[output->pos] = '\0';
201 ahwrite(output->dst, 1, output->pos, AH);
202
203 if (res == 0)
204 break; /* End of frame */
205 }
206 }
207}
208
209/* Public routine that supports Zstd compressed data I/O */
210void
212 const pg_compress_specification compression_spec)
213{
214 ZstdCompressorState *zstdcs;
215
216 cs->readData = ReadDataFromArchiveZstd;
217 cs->writeData = WriteDataToArchiveZstd;
218 cs->end = EndCompressorZstd;
219
220 cs->compression_spec = compression_spec;
221
222 zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
223 cs->private_data = zstdcs;
224
225 /* We expect that exactly one of readF/writeF is specified */
226 Assert((cs->readF == NULL) != (cs->writeF == NULL));
227
228 if (cs->readF != NULL)
229 {
230 zstdcs->dstream = ZSTD_createDStream();
231 if (zstdcs->dstream == NULL)
232 pg_fatal("could not initialize compression library");
233
234 zstdcs->input.size = ZSTD_DStreamInSize();
235 zstdcs->input.src = pg_malloc(zstdcs->input.size);
236
237 /*
238 * output.size is the buffer size we tell zstd it can output to.
239 * Allocate an additional byte such that ReadDataFromArchiveZstd() can
240 * call ahwrite() with a null-terminated string, which is an optimized
241 * case in ExecuteSqlCommandBuf().
242 */
243 zstdcs->output.size = ZSTD_DStreamOutSize();
244 zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
245 }
246 else if (cs->writeF != NULL)
247 {
248 zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
249
250 zstdcs->output.size = ZSTD_CStreamOutSize();
251 zstdcs->output.dst = pg_malloc(zstdcs->output.size);
252 zstdcs->output.pos = 0;
253 }
254}
255
256/*
257 * Compressed stream API
258 */
259
260static bool
261Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
262{
263 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
264 ZSTD_inBuffer *input = &zstdcs->input;
265 ZSTD_outBuffer *output = &zstdcs->output;
266 size_t input_allocated_size = ZSTD_DStreamInSize();
267 size_t res,
268 cnt;
269
270 output->size = size;
271 output->dst = ptr;
272 output->pos = 0;
273
274 for (;;)
275 {
276 Assert(input->pos <= input->size);
277 Assert(input->size <= input_allocated_size);
278
279 /*
280 * If the input is completely consumed, start back at the beginning
281 */
282 if (input->pos == input->size)
283 {
284 /* input->size is size produced by "fread" */
285 input->size = 0;
286 /* input->pos is position consumed by decompress */
287 input->pos = 0;
288 }
289
290 /* read compressed data if we must produce more input */
291 if (input->pos == input->size)
292 {
293 cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
294 input->size = cnt;
295
296 Assert(cnt <= input_allocated_size);
297
298 /* If we have no more input to consume, we're done */
299 if (cnt == 0)
300 break;
301 }
302
303 while (input->pos < input->size)
304 {
305 /* now decompress */
306 res = ZSTD_decompressStream(zstdcs->dstream, output, input);
307
308 if (ZSTD_isError(res))
309 pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
310
311 if (output->pos == output->size)
312 break; /* No more room for output */
313
314 if (res == 0)
315 break; /* End of frame */
316 }
317
318 if (output->pos == output->size)
319 break; /* We read all the data that fits */
320 }
321
322 if (rdsize != NULL)
323 *rdsize = output->pos;
324
325 return true;
326}
327
328static bool
329Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
330{
331 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
332 ZSTD_inBuffer *input = &zstdcs->input;
333 ZSTD_outBuffer *output = &zstdcs->output;
334 size_t res,
335 cnt;
336
337 input->src = ptr;
338 input->size = size;
339 input->pos = 0;
340
341 /* Consume all input, to be flushed later */
342 while (input->pos != input->size)
343 {
344 output->pos = 0;
345 res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
346 if (ZSTD_isError(res))
347 {
348 zstdcs->zstderror = ZSTD_getErrorName(res);
349 return false;
350 }
351
352 cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
353 if (cnt != output->pos)
354 {
355 zstdcs->zstderror = strerror(errno);
356 return false;
357 }
358 }
359
360 return size;
361}
362
363static int
364Zstd_getc(CompressFileHandle *CFH)
365{
366 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
367 int ret;
368
369 if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
370 {
371 if (feof(zstdcs->fp))
372 pg_fatal("could not read from input file: end of file");
373 else
374 pg_fatal("could not read from input file: %m");
375 }
376 return ret;
377}
378
379static char *
380Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
381{
382 int i;
383
384 Assert(len > 0);
385
386 /*
387 * Read one byte at a time until newline or EOF. This is only used to read
388 * the list of LOs, and the I/O is buffered anyway.
389 */
390 for (i = 0; i < len - 1; ++i)
391 {
392 size_t readsz;
393
394 if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
395 break;
396 if (readsz != 1)
397 break;
398 if (buf[i] == '\n')
399 {
400 ++i;
401 break;
402 }
403 }
404 buf[i] = '\0';
405 return i > 0 ? buf : NULL;
406}
407
408static bool
409Zstd_close(CompressFileHandle *CFH)
410{
411 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
412
413 if (zstdcs->cstream)
414 {
415 size_t res,
416 cnt;
417 ZSTD_inBuffer *input = &zstdcs->input;
418 ZSTD_outBuffer *output = &zstdcs->output;
419
420 /* Loop until the compression buffers are fully consumed */
421 for (;;)
422 {
423 output->pos = 0;
424 res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
425 if (ZSTD_isError(res))
426 {
427 zstdcs->zstderror = ZSTD_getErrorName(res);
428 return false;
429 }
430
431 cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
432 if (cnt != output->pos)
433 {
434 zstdcs->zstderror = strerror(errno);
435 return false;
436 }
437
438 if (res == 0)
439 break; /* End of frame */
440 }
441
442 ZSTD_freeCStream(zstdcs->cstream);
443 pg_free(zstdcs->output.dst);
444 }
445
446 if (zstdcs->dstream)
447 {
448 ZSTD_freeDStream(zstdcs->dstream);
449 pg_free(unconstify(void *, zstdcs->input.src));
450 }
451
452 if (fclose(zstdcs->fp) != 0)
453 return false;
454
455 pg_free(zstdcs);
456 return true;
457}
458
459static bool
460Zstd_eof(CompressFileHandle *CFH)
461{
462 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
463
464 return feof(zstdcs->fp);
465}
466
467static bool
468Zstd_open(const char *path, int fd, const char *mode,
470{
471 FILE *fp;
472 ZstdCompressorState *zstdcs;
473
474 if (fd >= 0)
475 fp = fdopen(fd, mode);
476 else
477 fp = fopen(path, mode);
478
479 if (fp == NULL)
480 return false;
481
482 zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
483 CFH->private_data = zstdcs;
484 zstdcs->fp = fp;
485
486 if (mode[0] == 'r')
487 {
488 zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
489 zstdcs->dstream = ZSTD_createDStream();
490 if (zstdcs->dstream == NULL)
491 pg_fatal("could not initialize compression library");
492 }
493 else if (mode[0] == 'w' || mode[0] == 'a')
494 {
495 zstdcs->output.size = ZSTD_CStreamOutSize();
496 zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
497 zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
498 if (zstdcs->cstream == NULL)
499 pg_fatal("could not initialize compression library");
500 }
501 else
502 pg_fatal("unhandled mode \"%s\"", mode);
503
504 return true;
505}
506
507static bool
508Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
509{
510 char fname[MAXPGPATH];
511
512 sprintf(fname, "%s.zst", path);
513 return CFH->open_func(fname, -1, mode, CFH);
514}
515
516static const char *
517Zstd_get_error(CompressFileHandle *CFH)
518{
519 ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
520
521 return zstdcs->zstderror;
522}
523
524void
526 const pg_compress_specification compression_spec)
527{
528 CFH->open_func = Zstd_open;
529 CFH->open_write_func = Zstd_open_write;
530 CFH->read_func = Zstd_read;
531 CFH->write_func = Zstd_write;
532 CFH->gets_func = Zstd_gets;
533 CFH->getc_func = Zstd_getc;
534 CFH->close_func = Zstd_close;
535 CFH->eof_func = Zstd_eof;
536 CFH->get_error_func = Zstd_get_error;
537
538 CFH->compression_spec = compression_spec;
539
540 CFH->private_data = NULL;
541}
542
543#endif /* USE_ZSTD */
#define unconstify(underlying_type, expr)
Definition: c.h:1199
#define Assert(condition)
Definition: c.h:812
void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
Definition: compress_zstd.c:23
void InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
Definition: compress_zstd.c:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
FILE * input
FILE * output
static struct @161 value
int i
Definition: isn.c:72
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
void ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
#define pg_fatal(...)
static PgChecksumMode mode
Definition: pg_checksums.c:55
#define MAXPGPATH
const void size_t len
const void * data
while(p+4<=pend)
static char * buf
Definition: pg_test_fsync.c:72
#define sprintf
Definition: port.h:240
#define strerror
Definition: port.h:251
static int fd(const char *x, int i)
Definition: preproc-init.c:105
static pg_noinline void Size size
Definition: slab.c:607
char *(* gets_func)(char *s, int size, CompressFileHandle *CFH)
Definition: compress_io.h:152
bool(* open_write_func)(const char *path, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:122
bool(* write_func)(const void *ptr, size_t size, struct CompressFileHandle *CFH)
Definition: compress_io.h:139
int(* getc_func)(CompressFileHandle *CFH)
Definition: compress_io.h:161
const char *(* get_error_func)(CompressFileHandle *CFH)
Definition: compress_io.h:181
bool(* eof_func)(CompressFileHandle *CFH)
Definition: compress_io.h:168
bool(* open_func)(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
Definition: compress_io.h:111
pg_compress_specification compression_spec
Definition: compress_io.h:186
bool(* close_func)(CompressFileHandle *CFH)
Definition: compress_io.h:175
bool(* read_func)(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
Definition: compress_io.h:131
void * private_data
Definition: compress_io.h:87
void(* readData)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:56
pg_compress_specification compression_spec
Definition: compress_io.h:82
void(* end)(ArchiveHandle *AH, CompressorState *cs)
Definition: compress_io.h:67
ReadFunc readF
Definition: compress_io.h:72
void(* writeData)(ArchiveHandle *AH, CompressorState *cs, const void *data, size_t dLen)
Definition: compress_io.h:61
WriteFunc writeF
Definition: compress_io.h:77