29 typedef struct astreamer_zstd_frame
35 ZSTD_outBuffer zstd_outBuf;
36 } astreamer_zstd_frame;
38 static void astreamer_zstd_compressor_content(
astreamer *streamer,
42 static void astreamer_zstd_compressor_finalize(
astreamer *streamer);
43 static void astreamer_zstd_compressor_free(
astreamer *streamer);
46 .
content = astreamer_zstd_compressor_content,
47 .finalize = astreamer_zstd_compressor_finalize,
48 .free = astreamer_zstd_compressor_free
51 static void astreamer_zstd_decompressor_content(
astreamer *streamer,
55 static void astreamer_zstd_decompressor_finalize(
astreamer *streamer);
56 static void astreamer_zstd_decompressor_free(
astreamer *streamer);
58 static const astreamer_ops astreamer_zstd_decompressor_ops = {
59 .
content = astreamer_zstd_decompressor_content,
60 .finalize = astreamer_zstd_decompressor_finalize,
61 .free = astreamer_zstd_decompressor_free
73 astreamer_zstd_frame *streamer;
78 streamer =
palloc0(
sizeof(astreamer_zstd_frame));
81 &astreamer_zstd_compressor_ops;
83 streamer->base.bbs_next =
next;
87 streamer->cctx = ZSTD_createCCtx();
89 pg_fatal(
"could not create zstd compression context");
92 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
94 if (ZSTD_isError(ret))
95 pg_fatal(
"could not set zstd compression level to %d: %s",
96 compress->
level, ZSTD_getErrorName(ret));
106 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
108 if (ZSTD_isError(ret))
109 pg_fatal(
"could not set compression worker count to %d: %s",
110 compress->
workers, ZSTD_getErrorName(ret));
115 ret = ZSTD_CCtx_setParameter(streamer->cctx,
116 ZSTD_c_enableLongDistanceMatching,
118 if (ZSTD_isError(ret))
121 ZSTD_getErrorName(ret));
127 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
128 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
129 streamer->zstd_outBuf.pos = 0;
131 return &streamer->base;
133 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
149 astreamer_zstd_compressor_content(
astreamer *streamer,
154 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
155 ZSTD_inBuffer inBuf = {
data,
len, 0};
157 while (inBuf.pos < inBuf.size)
160 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
166 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
170 mystreamer->zstd_outBuf.dst,
171 mystreamer->zstd_outBuf.pos,
175 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
176 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
177 mystreamer->zstd_outBuf.pos = 0;
181 ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
182 &inBuf, ZSTD_e_continue);
184 if (ZSTD_isError(yet_to_flush))
186 ZSTD_getErrorName(yet_to_flush));
194 astreamer_zstd_compressor_finalize(
astreamer *streamer)
196 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
201 ZSTD_inBuffer in = {NULL, 0, 0};
202 size_t max_needed = ZSTD_compressBound(0);
208 if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
212 mystreamer->zstd_outBuf.dst,
213 mystreamer->zstd_outBuf.pos,
217 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
218 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
219 mystreamer->zstd_outBuf.pos = 0;
222 yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
223 &mystreamer->zstd_outBuf,
226 if (ZSTD_isError(yet_to_flush))
228 ZSTD_getErrorName(yet_to_flush));
230 }
while (yet_to_flush > 0);
233 if (mystreamer->zstd_outBuf.pos > 0)
235 mystreamer->zstd_outBuf.dst,
236 mystreamer->zstd_outBuf.pos,
246 astreamer_zstd_compressor_free(
astreamer *streamer)
248 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
251 ZSTD_freeCCtx(mystreamer->cctx);
265 astreamer_zstd_frame *streamer;
269 streamer =
palloc0(
sizeof(astreamer_zstd_frame));
271 &astreamer_zstd_decompressor_ops;
273 streamer->base.bbs_next =
next;
277 streamer->dctx = ZSTD_createDCtx();
279 pg_fatal(
"could not create zstd decompression context");
282 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
283 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
284 streamer->zstd_outBuf.pos = 0;
286 return &streamer->base;
288 pg_fatal(
"this build does not support compression with %s",
"ZSTD");
300 astreamer_zstd_decompressor_content(
astreamer *streamer,
305 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
306 ZSTD_inBuffer inBuf = {
data,
len, 0};
308 while (inBuf.pos < inBuf.size)
316 if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
319 mystreamer->zstd_outBuf.dst,
320 mystreamer->zstd_outBuf.pos,
324 mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
325 mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
326 mystreamer->zstd_outBuf.pos = 0;
329 ret = ZSTD_decompressStream(mystreamer->dctx,
330 &mystreamer->zstd_outBuf, &inBuf);
332 if (ZSTD_isError(ret))
334 ZSTD_getErrorName(ret));
342 astreamer_zstd_decompressor_finalize(
astreamer *streamer)
344 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
350 if (mystreamer->zstd_outBuf.pos > 0)
352 mystreamer->base.bbs_buffer.data,
353 mystreamer->base.bbs_buffer.maxlen,
363 astreamer_zstd_decompressor_free(
astreamer *streamer)
365 astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
368 ZSTD_freeDCtx(mystreamer->dctx);
static void astreamer_free(astreamer *streamer)
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
static void astreamer_finalize(astreamer *streamer)
astreamer_archive_context
astreamer * astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
astreamer * astreamer_zstd_decompressor_new(astreamer *next)
#define Assert(condition)
#define PG_COMPRESSION_OPTION_WORKERS
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
#define pg_log_error(...)
void pfree(void *pointer)
void * palloc0(Size size)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
StringInfoData bbs_buffer