25 typedef struct bbstreamer_zstd_frame
31 ZSTD_outBuffer zstd_outBuf;
32 } bbstreamer_zstd_frame;
34 static void bbstreamer_zstd_compressor_content(
bbstreamer *streamer,
38 static void bbstreamer_zstd_compressor_finalize(
bbstreamer *streamer);
39 static void bbstreamer_zstd_compressor_free(
bbstreamer *streamer);
42 .
content = bbstreamer_zstd_compressor_content,
43 .finalize = bbstreamer_zstd_compressor_finalize,
44 .free = bbstreamer_zstd_compressor_free
47 static void bbstreamer_zstd_decompressor_content(
bbstreamer *streamer,
51 static void bbstreamer_zstd_decompressor_finalize(
bbstreamer *streamer);
52 static void bbstreamer_zstd_decompressor_free(
bbstreamer *streamer);
55 .
content = bbstreamer_zstd_decompressor_content,
56 .finalize = bbstreamer_zstd_decompressor_finalize,
57 .free = bbstreamer_zstd_decompressor_free
69 bbstreamer_zstd_frame *streamer;
74 streamer =
palloc0(
sizeof(bbstreamer_zstd_frame));
77 &bbstreamer_zstd_compressor_ops;
79 streamer->base.bbs_next =
next;
83 streamer->cctx = ZSTD_createCCtx();
85 pg_fatal(
"could not create zstd compression context");
88 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
90 if (ZSTD_isError(ret))
91 pg_fatal(
"could not set zstd compression level to %d: %s",
92 compress->
level, ZSTD_getErrorName(ret));
102 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
104 if (ZSTD_isError(ret))
105 pg_fatal(
"could not set compression worker count to %d: %s",
106 compress->
workers, ZSTD_getErrorName(ret));
110 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
111 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
112 streamer->zstd_outBuf.pos = 0;
114 return &streamer->base;
116 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
132 bbstreamer_zstd_compressor_content(
bbstreamer *streamer,
137 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
138 ZSTD_inBuffer inBuf = {
data,
len, 0};
140 while (inBuf.pos < inBuf.size)
143 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
149 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
153 mystreamer->zstd_outBuf.dst,
154 mystreamer->zstd_outBuf.pos,
158 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
159 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
160 mystreamer->zstd_outBuf.pos = 0;
164 ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
165 &inBuf, ZSTD_e_continue);
167 if (ZSTD_isError(yet_to_flush))
169 ZSTD_getErrorName(yet_to_flush));
177 bbstreamer_zstd_compressor_finalize(
bbstreamer *streamer)
179 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
184 ZSTD_inBuffer in = {NULL, 0, 0};
185 size_t max_needed = ZSTD_compressBound(0);
191 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
195 mystreamer->zstd_outBuf.dst,
196 mystreamer->zstd_outBuf.pos,
200 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
201 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
202 mystreamer->zstd_outBuf.pos = 0;
205 yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
206 &mystreamer->zstd_outBuf,
209 if (ZSTD_isError(yet_to_flush))
211 ZSTD_getErrorName(yet_to_flush));
213 }
while (yet_to_flush > 0);
216 if (mystreamer->zstd_outBuf.pos > 0)
218 mystreamer->zstd_outBuf.dst,
219 mystreamer->zstd_outBuf.pos,
229 bbstreamer_zstd_compressor_free(
bbstreamer *streamer)
231 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
234 ZSTD_freeCCtx(mystreamer->cctx);
248 bbstreamer_zstd_frame *streamer;
252 streamer =
palloc0(
sizeof(bbstreamer_zstd_frame));
254 &bbstreamer_zstd_decompressor_ops;
256 streamer->base.bbs_next =
next;
260 streamer->dctx = ZSTD_createDCtx();
262 pg_fatal(
"could not create zstd decompression context");
265 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
266 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
267 streamer->zstd_outBuf.pos = 0;
269 return &streamer->base;
271 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
283 bbstreamer_zstd_decompressor_content(
bbstreamer *streamer,
288 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
289 ZSTD_inBuffer inBuf = {
data,
len, 0};
291 while (inBuf.pos < inBuf.size)
299 if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
302 mystreamer->zstd_outBuf.dst,
303 mystreamer->zstd_outBuf.pos,
307 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
308 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
309 mystreamer->zstd_outBuf.pos = 0;
312 ret = ZSTD_decompressStream(mystreamer->dctx,
313 &mystreamer->zstd_outBuf, &inBuf);
315 if (ZSTD_isError(ret))
317 ZSTD_getErrorName(ret));
325 bbstreamer_zstd_decompressor_finalize(
bbstreamer *streamer)
327 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
333 if (mystreamer->zstd_outBuf.pos > 0)
335 mystreamer->base.bbs_buffer.data,
336 mystreamer->base.bbs_buffer.maxlen,
346 bbstreamer_zstd_decompressor_free(
bbstreamer *streamer)
348 bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
351 ZSTD_freeDCtx(mystreamer->dctx);
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
static void bbstreamer_finalize(bbstreamer *streamer)
bbstreamer_archive_context
static void bbstreamer_free(bbstreamer *streamer)
bbstreamer * bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_zstd_decompressor_new(bbstreamer *next)
#define PG_COMPRESSION_OPTION_WORKERS
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
void pfree(void *pointer)
void * palloc0(Size size)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
StringInfoData bbs_buffer