PostgreSQL Source Code git master
Loading...
Searching...
No Matches
astreamer_zstd.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * astreamer_zstd.c
4 *
5 * Archive streamers that deal with data compressed using zstd.
6 * astreamer_zstd_compressor applies zstd compression to the input stream,
7 * and astreamer_zstd_decompressor does the reverse.
8 *
9 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
10 *
11 * IDENTIFICATION
12 * src/fe_utils/astreamer_zstd.c
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres_fe.h"
17
18#include <unistd.h>
19
20#ifdef USE_ZSTD
21#include <zstd.h>
22#endif
23
24#include "common/logging.h"
25#include "fe_utils/astreamer.h"
26
27#ifdef USE_ZSTD
28
29typedef struct astreamer_zstd_frame
30{
31 astreamer base;
32
37
39 astreamer_member *member,
40 const char *data, int len,
43static void astreamer_zstd_compressor_free(astreamer *streamer);
44
49};
50
52 astreamer_member *member,
53 const char *data, int len,
56static void astreamer_zstd_decompressor_free(astreamer *streamer);
57
62};
63#endif
64
65/*
66 * Create a new base backup streamer that performs zstd compression of tar
67 * blocks.
68 */
71{
72#ifdef USE_ZSTD
73 astreamer_zstd_frame *streamer;
74 size_t ret;
75
76 Assert(next != NULL);
77
79
80 *((const astreamer_ops **) &streamer->base.bbs_ops) =
82
83 streamer->base.bbs_next = next;
84 initStringInfo(&streamer->base.bbs_buffer);
85 enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
86
87 streamer->cctx = ZSTD_createCCtx();
88 if (!streamer->cctx)
89 pg_fatal("could not create zstd compression context");
90
91 /* Set compression level */
93 compress->level);
94 if (ZSTD_isError(ret))
95 pg_fatal("could not set zstd compression level to %d: %s",
96 compress->level, ZSTD_getErrorName(ret));
97
98 /* Set # of workers, if specified */
99 if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100 {
101 /*
102 * On older versions of libzstd, this option does not exist, and
103 * trying to set it will fail. Similarly for newer versions if they
104 * are compiled without threading support.
105 */
106 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107 compress->workers);
108 if (ZSTD_isError(ret))
109 pg_fatal("could not set compression worker count to %d: %s",
110 compress->workers, ZSTD_getErrorName(ret));
111 }
112
113 if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114 {
115 ret = ZSTD_CCtx_setParameter(streamer->cctx,
117 compress->long_distance);
118 if (ZSTD_isError(ret))
119 pg_fatal("could not enable long-distance mode: %s",
120 ZSTD_getErrorName(ret));
121 }
122
123 /* Initialize the ZSTD output buffer. */
124 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
125 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
126 streamer->zstd_outBuf.pos = 0;
127
128 return &streamer->base;
129#else
130 pg_fatal("this build does not support compression with %s", "ZSTD");
131 return NULL; /* keep compiler quiet */
132#endif
133}
134
135#ifdef USE_ZSTD
136/*
137 * Compress the input data to output buffer.
138 *
139 * Find out the compression bound based on input data length for each
140 * invocation to make sure that output buffer has enough capacity to
141 * accommodate the compressed data. In case if the output buffer
142 * capacity falls short of compression bound then forward the content
143 * of output buffer to next streamer and empty the buffer.
144 */
145static void
147 astreamer_member *member,
148 const char *data, int len,
150{
152 ZSTD_inBuffer inBuf = {data, len, 0};
153
154 while (inBuf.pos < inBuf.size)
155 {
156 size_t yet_to_flush;
157 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
158
159 /*
160 * If the output buffer is not left with enough space, send the
161 * compressed bytes to the next streamer, and empty the buffer.
162 */
163 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
165 {
166 astreamer_content(mystreamer->base.bbs_next, member,
167 mystreamer->zstd_outBuf.dst,
168 mystreamer->zstd_outBuf.pos,
169 context);
170
171 /* Reset the ZSTD output buffer. */
172 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
173 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
174 mystreamer->zstd_outBuf.pos = 0;
175 }
176
178 ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
180
182 pg_fatal("could not compress data: %s",
184 }
185}
186
187/*
188 * End-of-stream processing.
189 */
190static void
192{
194 size_t yet_to_flush;
195
196 do
197 {
198 ZSTD_inBuffer in = {NULL, 0, 0};
199 size_t max_needed = ZSTD_compressBound(0);
200
201 /*
202 * If the output buffer is not left with enough space, send the
203 * compressed bytes to the next streamer, and empty the buffer.
204 */
205 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
207 {
208 astreamer_content(mystreamer->base.bbs_next, NULL,
209 mystreamer->zstd_outBuf.dst,
210 mystreamer->zstd_outBuf.pos,
212
213 /* Reset the ZSTD output buffer. */
214 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
215 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
216 mystreamer->zstd_outBuf.pos = 0;
217 }
218
220 &mystreamer->zstd_outBuf,
221 &in, ZSTD_e_end);
222
224 pg_fatal("could not compress data: %s",
226
227 } while (yet_to_flush > 0);
228
229 /* Make sure to pass any remaining bytes to the next streamer. */
230 if (mystreamer->zstd_outBuf.pos > 0)
231 astreamer_content(mystreamer->base.bbs_next, NULL,
232 mystreamer->zstd_outBuf.dst,
233 mystreamer->zstd_outBuf.pos,
235
236 astreamer_finalize(mystreamer->base.bbs_next);
237}
238
239/*
240 * Free memory.
241 */
242static void
244{
246
247 astreamer_free(streamer->bbs_next);
249 pfree(streamer->bbs_buffer.data);
250 pfree(streamer);
251}
252#endif
253
254/*
255 * Create a new base backup streamer that performs decompression of zstd
256 * compressed blocks.
257 */
258astreamer *
260{
261#ifdef USE_ZSTD
262 astreamer_zstd_frame *streamer;
263
264 Assert(next != NULL);
265
267 *((const astreamer_ops **) &streamer->base.bbs_ops) =
269
270 streamer->base.bbs_next = next;
271 initStringInfo(&streamer->base.bbs_buffer);
272 enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
273
274 streamer->dctx = ZSTD_createDCtx();
275 if (!streamer->dctx)
276 pg_fatal("could not create zstd decompression context");
277
278 /* Initialize the ZSTD output buffer. */
279 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
280 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
281 streamer->zstd_outBuf.pos = 0;
282
283 return &streamer->base;
284#else
285 pg_fatal("this build does not support compression with %s", "ZSTD");
286 return NULL; /* keep compiler quiet */
287#endif
288}
289
290#ifdef USE_ZSTD
291/*
292 * Decompress the input data to output buffer until we run out of input
293 * data. Each time the output buffer is full, pass on the decompressed data
294 * to the next streamer.
295 */
296static void
298 astreamer_member *member,
299 const char *data, int len,
301{
303 ZSTD_inBuffer inBuf = {data, len, 0};
304
305 while (inBuf.pos < inBuf.size)
306 {
307 size_t ret;
308
309 /*
310 * If output buffer is full then forward the content to next streamer
311 * and update the output buffer.
312 */
313 if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
314 {
315 astreamer_content(mystreamer->base.bbs_next, member,
316 mystreamer->zstd_outBuf.dst,
317 mystreamer->zstd_outBuf.pos,
318 context);
319
320 /* Reset the ZSTD output buffer. */
321 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
322 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
323 mystreamer->zstd_outBuf.pos = 0;
324 }
325
327 &mystreamer->zstd_outBuf, &inBuf);
328
329 if (ZSTD_isError(ret))
330 pg_fatal("could not decompress data: %s",
331 ZSTD_getErrorName(ret));
332 }
333}
334
335/*
336 * End-of-stream processing.
337 */
338static void
340{
342
343 /*
344 * End of the stream, if there is some pending data in output buffers then
345 * we must forward it to next streamer.
346 */
347 if (mystreamer->zstd_outBuf.pos > 0)
348 astreamer_content(mystreamer->base.bbs_next, NULL,
349 mystreamer->base.bbs_buffer.data,
350 mystreamer->base.bbs_buffer.maxlen,
352
353 astreamer_finalize(mystreamer->base.bbs_next);
354}
355
356/*
357 * Free memory.
358 */
359static void
361{
363
364 astreamer_free(streamer->bbs_next);
366 pfree(streamer->bbs_buffer.data);
367 pfree(streamer);
368}
369#endif
static void astreamer_free(astreamer *streamer)
Definition astreamer.h:153
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition astreamer.h:135
static void astreamer_finalize(astreamer *streamer)
Definition astreamer.h:145
astreamer_archive_context
Definition astreamer.h:63
@ ASTREAMER_UNKNOWN
Definition astreamer.h:64
astreamer * astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
astreamer * astreamer_zstd_decompressor_new(astreamer *next)
static int32 next
Definition blutils.c:225
#define Assert(condition)
Definition c.h:945
#define PG_COMPRESSION_OPTION_WORKERS
Definition compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition compression.h:30
#define palloc0_object(type)
Definition fe_memutils.h:75
void pfree(void *pointer)
Definition mcxt.c:1616
#define pg_fatal(...)
const void size_t len
const void * data
static int fb(int x)
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition astreamer.h:126
StringInfoData bbs_buffer
Definition astreamer.h:111
astreamer * bbs_next
Definition astreamer.h:110