25 typedef struct bbstreamer_zstd_frame
31 ZSTD_outBuffer zstd_outBuf;
32 } bbstreamer_zstd_frame;
34 static void bbstreamer_zstd_compressor_content(
bbstreamer *streamer,
38 static void bbstreamer_zstd_compressor_finalize(
bbstreamer *streamer);
39 static void bbstreamer_zstd_compressor_free(
bbstreamer *streamer);
42 .
content = bbstreamer_zstd_compressor_content,
43 .finalize = bbstreamer_zstd_compressor_finalize,
44 .free = bbstreamer_zstd_compressor_free
47 static void bbstreamer_zstd_decompressor_content(
bbstreamer *streamer,
51 static void bbstreamer_zstd_decompressor_finalize(
bbstreamer *streamer);
52 static void bbstreamer_zstd_decompressor_free(
bbstreamer *streamer);
55 .
content = bbstreamer_zstd_decompressor_content,
56 .finalize = bbstreamer_zstd_decompressor_finalize,
57 .free = bbstreamer_zstd_decompressor_free
69 bbstreamer_zstd_frame *streamer;
74 streamer =
palloc0(
sizeof(bbstreamer_zstd_frame));
77 &bbstreamer_zstd_compressor_ops;
79 streamer->base.bbs_next =
next;
83 streamer->cctx = ZSTD_createCCtx();
85 pg_fatal(
"could not create zstd compression context");
90 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
92 if (ZSTD_isError(ret))
93 pg_fatal(
"could not set zstd compression level to %d: %s",
94 compress->
level, ZSTD_getErrorName(ret));
105 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107 if (ZSTD_isError(ret))
108 pg_fatal(
"could not set compression worker count to %d: %s",
109 compress->
workers, ZSTD_getErrorName(ret));
113 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
114 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
115 streamer->zstd_outBuf.pos = 0;
117 return &streamer->base;
119 pg_fatal(
"this build does not support zstd compression");
135 bbstreamer_zstd_compressor_content(
bbstreamer *streamer,
140 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
141 ZSTD_inBuffer inBuf = {
data,
len, 0};
143 while (inBuf.pos < inBuf.size)
146 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
152 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
156 mystreamer->zstd_outBuf.dst,
157 mystreamer->zstd_outBuf.pos,
161 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
162 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
163 mystreamer->zstd_outBuf.pos = 0;
167 ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
168 &inBuf, ZSTD_e_continue);
170 if (ZSTD_isError(yet_to_flush))
172 ZSTD_getErrorName(yet_to_flush));
180 bbstreamer_zstd_compressor_finalize(
bbstreamer *streamer)
182 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
187 ZSTD_inBuffer in = {NULL, 0, 0};
188 size_t max_needed = ZSTD_compressBound(0);
194 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
198 mystreamer->zstd_outBuf.dst,
199 mystreamer->zstd_outBuf.pos,
203 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
204 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
205 mystreamer->zstd_outBuf.pos = 0;
208 yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
209 &mystreamer->zstd_outBuf,
212 if (ZSTD_isError(yet_to_flush))
214 ZSTD_getErrorName(yet_to_flush));
216 }
while (yet_to_flush > 0);
219 if (mystreamer->zstd_outBuf.pos > 0)
221 mystreamer->zstd_outBuf.dst,
222 mystreamer->zstd_outBuf.pos,
232 bbstreamer_zstd_compressor_free(
bbstreamer *streamer)
234 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
237 ZSTD_freeCCtx(mystreamer->cctx);
251 bbstreamer_zstd_frame *streamer;
255 streamer =
palloc0(
sizeof(bbstreamer_zstd_frame));
257 &bbstreamer_zstd_decompressor_ops;
259 streamer->base.bbs_next =
next;
263 streamer->dctx = ZSTD_createDCtx();
265 pg_fatal(
"could not create zstd decompression context");
268 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
269 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
270 streamer->zstd_outBuf.pos = 0;
272 return &streamer->base;
274 pg_fatal(
"this build does not support zstd compression");
286 bbstreamer_zstd_decompressor_content(
bbstreamer *streamer,
291 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
292 ZSTD_inBuffer inBuf = {
data,
len, 0};
294 while (inBuf.pos < inBuf.size)
302 if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
305 mystreamer->zstd_outBuf.dst,
306 mystreamer->zstd_outBuf.pos,
310 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
311 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
312 mystreamer->zstd_outBuf.pos = 0;
315 ret = ZSTD_decompressStream(mystreamer->dctx,
316 &mystreamer->zstd_outBuf, &inBuf);
318 if (ZSTD_isError(ret))
320 ZSTD_getErrorName(ret));
328 bbstreamer_zstd_decompressor_finalize(
bbstreamer *streamer)
330 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
336 if (mystreamer->zstd_outBuf.pos > 0)
338 mystreamer->base.bbs_buffer.data,
339 mystreamer->base.bbs_buffer.maxlen,
349 bbstreamer_zstd_decompressor_free(
bbstreamer *streamer)
351 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
354 ZSTD_freeDCtx(mystreamer->dctx);
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
static void bbstreamer_finalize(bbstreamer *streamer)
bbstreamer_archive_context
static void bbstreamer_free(bbstreamer *streamer)
bbstreamer * bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_zstd_decompressor_new(bbstreamer *next)
#define PG_COMPRESSION_OPTION_LEVEL
#define PG_COMPRESSION_OPTION_WORKERS
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
void pfree(void *pointer)
void * palloc0(Size size)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
StringInfoData bbs_buffer