25 typedef struct bbstreamer_zstd_frame
31 ZSTD_outBuffer zstd_outBuf;
32 } bbstreamer_zstd_frame;
34 static void bbstreamer_zstd_compressor_content(
bbstreamer *streamer,
38 static void bbstreamer_zstd_compressor_finalize(
bbstreamer *streamer);
39 static void bbstreamer_zstd_compressor_free(
bbstreamer *streamer);
42 .
content = bbstreamer_zstd_compressor_content,
43 .finalize = bbstreamer_zstd_compressor_finalize,
44 .free = bbstreamer_zstd_compressor_free
47 static void bbstreamer_zstd_decompressor_content(
bbstreamer *streamer,
51 static void bbstreamer_zstd_decompressor_finalize(
bbstreamer *streamer);
52 static void bbstreamer_zstd_decompressor_free(
bbstreamer *streamer);
55 .
content = bbstreamer_zstd_decompressor_content,
56 .finalize = bbstreamer_zstd_decompressor_finalize,
57 .free = bbstreamer_zstd_decompressor_free
69 bbstreamer_zstd_frame *streamer;
74 streamer =
palloc0(
sizeof(bbstreamer_zstd_frame));
77 &bbstreamer_zstd_compressor_ops;
79 streamer->base.bbs_next =
next;
83 streamer->cctx = ZSTD_createCCtx();
85 pg_fatal(
"could not create zstd compression context");
88 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
90 if (ZSTD_isError(ret))
91 pg_fatal(
"could not set zstd compression level to %d: %s",
92 compress->
level, ZSTD_getErrorName(ret));
102 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
104 if (ZSTD_isError(ret))
105 pg_fatal(
"could not set compression worker count to %d: %s",
106 compress->
workers, ZSTD_getErrorName(ret));
111 ret = ZSTD_CCtx_setParameter(streamer->cctx,
112 ZSTD_c_enableLongDistanceMatching,
114 if (ZSTD_isError(ret))
117 ZSTD_getErrorName(ret));
123 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
124 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
125 streamer->zstd_outBuf.pos = 0;
127 return &streamer->base;
129 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
145 bbstreamer_zstd_compressor_content(
bbstreamer *streamer,
150 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
151 ZSTD_inBuffer inBuf = {
data,
len, 0};
153 while (inBuf.pos < inBuf.size)
156 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
162 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
166 mystreamer->zstd_outBuf.dst,
167 mystreamer->zstd_outBuf.pos,
171 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
172 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
173 mystreamer->zstd_outBuf.pos = 0;
177 ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
178 &inBuf, ZSTD_e_continue);
180 if (ZSTD_isError(yet_to_flush))
182 ZSTD_getErrorName(yet_to_flush));
190 bbstreamer_zstd_compressor_finalize(
bbstreamer *streamer)
192 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
197 ZSTD_inBuffer in = {NULL, 0, 0};
198 size_t max_needed = ZSTD_compressBound(0);
204 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
208 mystreamer->zstd_outBuf.dst,
209 mystreamer->zstd_outBuf.pos,
213 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
214 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
215 mystreamer->zstd_outBuf.pos = 0;
218 yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
219 &mystreamer->zstd_outBuf,
222 if (ZSTD_isError(yet_to_flush))
224 ZSTD_getErrorName(yet_to_flush));
226 }
while (yet_to_flush > 0);
229 if (mystreamer->zstd_outBuf.pos > 0)
231 mystreamer->zstd_outBuf.dst,
232 mystreamer->zstd_outBuf.pos,
242 bbstreamer_zstd_compressor_free(
bbstreamer *streamer)
244 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
247 ZSTD_freeCCtx(mystreamer->cctx);
261 bbstreamer_zstd_frame *streamer;
265 streamer =
palloc0(
sizeof(bbstreamer_zstd_frame));
267 &bbstreamer_zstd_decompressor_ops;
269 streamer->base.bbs_next =
next;
273 streamer->dctx = ZSTD_createDCtx();
275 pg_fatal(
"could not create zstd decompression context");
278 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
279 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
280 streamer->zstd_outBuf.pos = 0;
282 return &streamer->base;
284 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
296 bbstreamer_zstd_decompressor_content(
bbstreamer *streamer,
301 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
302 ZSTD_inBuffer inBuf = {
data,
len, 0};
304 while (inBuf.pos < inBuf.size)
312 if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
315 mystreamer->zstd_outBuf.dst,
316 mystreamer->zstd_outBuf.pos,
320 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
321 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
322 mystreamer->zstd_outBuf.pos = 0;
325 ret = ZSTD_decompressStream(mystreamer->dctx,
326 &mystreamer->zstd_outBuf, &inBuf);
328 if (ZSTD_isError(ret))
330 ZSTD_getErrorName(ret));
338 bbstreamer_zstd_decompressor_finalize(
bbstreamer *streamer)
340 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
346 if (mystreamer->zstd_outBuf.pos > 0)
348 mystreamer->base.bbs_buffer.data,
349 mystreamer->base.bbs_buffer.maxlen,
359 bbstreamer_zstd_decompressor_free(
bbstreamer *streamer)
361 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
364 ZSTD_freeDCtx(mystreamer->dctx);
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
static void bbstreamer_finalize(bbstreamer *streamer)
bbstreamer_archive_context
static void bbstreamer_free(bbstreamer *streamer)
bbstreamer * bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_zstd_decompressor_new(bbstreamer *next)
#define PG_COMPRESSION_OPTION_WORKERS
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
void pfree(void *pointer)
void * palloc0(Size size)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
StringInfoData bbs_buffer