PostgreSQL Source Code git master
astreamer_zstd.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * astreamer_zstd.c
4 *
5 * Archive streamers that deal with data compressed using zstd.
6 * astreamer_zstd_compressor applies lz4 compression to the input stream,
7 * and astreamer_zstd_decompressor does the reverse.
8 *
9 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
10 *
11 * IDENTIFICATION
12 * src/fe_utils/astreamer_zstd.c
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres_fe.h"
17
18#include <unistd.h>
19
20#ifdef USE_ZSTD
21#include <zstd.h>
22#endif
23
24#include "common/logging.h"
25#include "fe_utils/astreamer.h"
26
27#ifdef USE_ZSTD
28
29typedef struct astreamer_zstd_frame
30{
31 astreamer base;
32
33 ZSTD_CCtx *cctx;
34 ZSTD_DCtx *dctx;
35 ZSTD_outBuffer zstd_outBuf;
36} astreamer_zstd_frame;
37
38static void astreamer_zstd_compressor_content(astreamer *streamer,
39 astreamer_member *member,
40 const char *data, int len,
42static void astreamer_zstd_compressor_finalize(astreamer *streamer);
43static void astreamer_zstd_compressor_free(astreamer *streamer);
44
45static const astreamer_ops astreamer_zstd_compressor_ops = {
46 .content = astreamer_zstd_compressor_content,
47 .finalize = astreamer_zstd_compressor_finalize,
48 .free = astreamer_zstd_compressor_free
49};
50
51static void astreamer_zstd_decompressor_content(astreamer *streamer,
52 astreamer_member *member,
53 const char *data, int len,
55static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
56static void astreamer_zstd_decompressor_free(astreamer *streamer);
57
58static const astreamer_ops astreamer_zstd_decompressor_ops = {
59 .content = astreamer_zstd_decompressor_content,
60 .finalize = astreamer_zstd_decompressor_finalize,
61 .free = astreamer_zstd_decompressor_free
62};
63#endif
64
65/*
66 * Create a new base backup streamer that performs zstd compression of tar
67 * blocks.
68 */
71{
72#ifdef USE_ZSTD
73 astreamer_zstd_frame *streamer;
74 size_t ret;
75
76 Assert(next != NULL);
77
78 streamer = palloc0(sizeof(astreamer_zstd_frame));
79
80 *((const astreamer_ops **) &streamer->base.bbs_ops) =
81 &astreamer_zstd_compressor_ops;
82
83 streamer->base.bbs_next = next;
84 initStringInfo(&streamer->base.bbs_buffer);
85 enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
86
87 streamer->cctx = ZSTD_createCCtx();
88 if (!streamer->cctx)
89 pg_fatal("could not create zstd compression context");
90
91 /* Set compression level */
92 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
93 compress->level);
94 if (ZSTD_isError(ret))
95 pg_fatal("could not set zstd compression level to %d: %s",
96 compress->level, ZSTD_getErrorName(ret));
97
98 /* Set # of workers, if specified */
99 if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100 {
101 /*
102 * On older versions of libzstd, this option does not exist, and
103 * trying to set it will fail. Similarly for newer versions if they
104 * are compiled without threading support.
105 */
106 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107 compress->workers);
108 if (ZSTD_isError(ret))
109 pg_fatal("could not set compression worker count to %d: %s",
110 compress->workers, ZSTD_getErrorName(ret));
111 }
112
113 if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114 {
115 ret = ZSTD_CCtx_setParameter(streamer->cctx,
116 ZSTD_c_enableLongDistanceMatching,
117 compress->long_distance);
118 if (ZSTD_isError(ret))
119 {
120 pg_log_error("could not enable long-distance mode: %s",
121 ZSTD_getErrorName(ret));
122 exit(1);
123 }
124 }
125
126 /* Initialize the ZSTD output buffer. */
127 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
128 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
129 streamer->zstd_outBuf.pos = 0;
130
131 return &streamer->base;
132#else
133 pg_fatal("this build does not support compression with %s", "ZSTD");
134 return NULL; /* keep compiler quiet */
135#endif
136}
137
138#ifdef USE_ZSTD
139/*
140 * Compress the input data to output buffer.
141 *
142 * Find out the compression bound based on input data length for each
143 * invocation to make sure that output buffer has enough capacity to
144 * accommodate the compressed data. In case if the output buffer
145 * capacity falls short of compression bound then forward the content
146 * of output buffer to next streamer and empty the buffer.
147 */
148static void
149astreamer_zstd_compressor_content(astreamer *streamer,
150 astreamer_member *member,
151 const char *data, int len,
153{
154 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
155 ZSTD_inBuffer inBuf = {data, len, 0};
156
157 while (inBuf.pos < inBuf.size)
158 {
159 size_t yet_to_flush;
160 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
161
162 /*
163 * If the output buffer is not left with enough space, send the
164 * compressed bytes to the next streamer, and empty the buffer.
165 */
166 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
167 max_needed)
168 {
169 astreamer_content(mystreamer->base.bbs_next, member,
170 mystreamer->zstd_outBuf.dst,
171 mystreamer->zstd_outBuf.pos,
172 context);
173
174 /* Reset the ZSTD output buffer. */
175 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
176 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
177 mystreamer->zstd_outBuf.pos = 0;
178 }
179
180 yet_to_flush =
181 ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
182 &inBuf, ZSTD_e_continue);
183
184 if (ZSTD_isError(yet_to_flush))
185 pg_log_error("could not compress data: %s",
186 ZSTD_getErrorName(yet_to_flush));
187 }
188}
189
190/*
191 * End-of-stream processing.
192 */
193static void
194astreamer_zstd_compressor_finalize(astreamer *streamer)
195{
196 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
197 size_t yet_to_flush;
198
199 do
200 {
201 ZSTD_inBuffer in = {NULL, 0, 0};
202 size_t max_needed = ZSTD_compressBound(0);
203
204 /*
205 * If the output buffer is not left with enough space, send the
206 * compressed bytes to the next streamer, and empty the buffer.
207 */
208 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
209 max_needed)
210 {
211 astreamer_content(mystreamer->base.bbs_next, NULL,
212 mystreamer->zstd_outBuf.dst,
213 mystreamer->zstd_outBuf.pos,
215
216 /* Reset the ZSTD output buffer. */
217 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
218 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
219 mystreamer->zstd_outBuf.pos = 0;
220 }
221
222 yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
223 &mystreamer->zstd_outBuf,
224 &in, ZSTD_e_end);
225
226 if (ZSTD_isError(yet_to_flush))
227 pg_log_error("could not compress data: %s",
228 ZSTD_getErrorName(yet_to_flush));
229
230 } while (yet_to_flush > 0);
231
232 /* Make sure to pass any remaining bytes to the next streamer. */
233 if (mystreamer->zstd_outBuf.pos > 0)
234 astreamer_content(mystreamer->base.bbs_next, NULL,
235 mystreamer->zstd_outBuf.dst,
236 mystreamer->zstd_outBuf.pos,
238
239 astreamer_finalize(mystreamer->base.bbs_next);
240}
241
242/*
243 * Free memory.
244 */
245static void
246astreamer_zstd_compressor_free(astreamer *streamer)
247{
248 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
249
250 astreamer_free(streamer->bbs_next);
251 ZSTD_freeCCtx(mystreamer->cctx);
252 pfree(streamer->bbs_buffer.data);
253 pfree(streamer);
254}
255#endif
256
257/*
258 * Create a new base backup streamer that performs decompression of zstd
259 * compressed blocks.
260 */
261astreamer *
263{
264#ifdef USE_ZSTD
265 astreamer_zstd_frame *streamer;
266
267 Assert(next != NULL);
268
269 streamer = palloc0(sizeof(astreamer_zstd_frame));
270 *((const astreamer_ops **) &streamer->base.bbs_ops) =
271 &astreamer_zstd_decompressor_ops;
272
273 streamer->base.bbs_next = next;
274 initStringInfo(&streamer->base.bbs_buffer);
275 enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
276
277 streamer->dctx = ZSTD_createDCtx();
278 if (!streamer->dctx)
279 pg_fatal("could not create zstd decompression context");
280
281 /* Initialize the ZSTD output buffer. */
282 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
283 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
284 streamer->zstd_outBuf.pos = 0;
285
286 return &streamer->base;
287#else
288 pg_fatal("this build does not support compression with %s", "ZSTD");
289 return NULL; /* keep compiler quiet */
290#endif
291}
292
293#ifdef USE_ZSTD
294/*
295 * Decompress the input data to output buffer until we run out of input
296 * data. Each time the output buffer is full, pass on the decompressed data
297 * to the next streamer.
298 */
299static void
300astreamer_zstd_decompressor_content(astreamer *streamer,
301 astreamer_member *member,
302 const char *data, int len,
304{
305 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
306 ZSTD_inBuffer inBuf = {data, len, 0};
307
308 while (inBuf.pos < inBuf.size)
309 {
310 size_t ret;
311
312 /*
313 * If output buffer is full then forward the content to next streamer
314 * and update the output buffer.
315 */
316 if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
317 {
318 astreamer_content(mystreamer->base.bbs_next, member,
319 mystreamer->zstd_outBuf.dst,
320 mystreamer->zstd_outBuf.pos,
321 context);
322
323 /* Reset the ZSTD output buffer. */
324 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
325 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
326 mystreamer->zstd_outBuf.pos = 0;
327 }
328
329 ret = ZSTD_decompressStream(mystreamer->dctx,
330 &mystreamer->zstd_outBuf, &inBuf);
331
332 if (ZSTD_isError(ret))
333 pg_log_error("could not decompress data: %s",
334 ZSTD_getErrorName(ret));
335 }
336}
337
338/*
339 * End-of-stream processing.
340 */
341static void
342astreamer_zstd_decompressor_finalize(astreamer *streamer)
343{
344 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
345
346 /*
347 * End of the stream, if there is some pending data in output buffers then
348 * we must forward it to next streamer.
349 */
350 if (mystreamer->zstd_outBuf.pos > 0)
351 astreamer_content(mystreamer->base.bbs_next, NULL,
352 mystreamer->base.bbs_buffer.data,
353 mystreamer->base.bbs_buffer.maxlen,
355
356 astreamer_finalize(mystreamer->base.bbs_next);
357}
358
359/*
360 * Free memory.
361 */
362static void
363astreamer_zstd_decompressor_free(astreamer *streamer)
364{
365 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
366
367 astreamer_free(streamer->bbs_next);
368 ZSTD_freeDCtx(mystreamer->dctx);
369 pfree(streamer->bbs_buffer.data);
370 pfree(streamer);
371}
372#endif
static void astreamer_free(astreamer *streamer)
Definition: astreamer.h:153
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:135
static void astreamer_finalize(astreamer *streamer)
Definition: astreamer.h:145
astreamer_archive_context
Definition: astreamer.h:63
@ ASTREAMER_UNKNOWN
Definition: astreamer.h:64
astreamer * astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
astreamer * astreamer_zstd_decompressor_new(astreamer *next)
static int32 next
Definition: blutils.c:224
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
Assert(PointerIsAligned(start, uint64))
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1524
void * palloc0(Size size)
Definition: mcxt.c:1347
#define pg_fatal(...)
const void size_t len
const void * data
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:337
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:126
StringInfoData bbs_buffer
Definition: astreamer.h:111
astreamer * bbs_next
Definition: astreamer.h:110