26 typedef struct bbstreamer_lz4_frame
30 LZ4F_compressionContext_t cctx;
31 LZ4F_decompressionContext_t dctx;
32 LZ4F_preferences_t prefs;
36 } bbstreamer_lz4_frame;
38 static void bbstreamer_lz4_compressor_content(
bbstreamer *streamer,
42 static void bbstreamer_lz4_compressor_finalize(
bbstreamer *streamer);
43 static void bbstreamer_lz4_compressor_free(
bbstreamer *streamer);
46 .
content = bbstreamer_lz4_compressor_content,
47 .finalize = bbstreamer_lz4_compressor_finalize,
48 .free = bbstreamer_lz4_compressor_free
51 static void bbstreamer_lz4_decompressor_content(
bbstreamer *streamer,
55 static void bbstreamer_lz4_decompressor_finalize(
bbstreamer *streamer);
56 static void bbstreamer_lz4_decompressor_free(
bbstreamer *streamer);
59 .
content = bbstreamer_lz4_decompressor_content,
60 .finalize = bbstreamer_lz4_decompressor_finalize,
61 .free = bbstreamer_lz4_decompressor_free
73 bbstreamer_lz4_frame *streamer;
74 LZ4F_errorCode_t ctxError;
75 LZ4F_preferences_t *prefs;
79 streamer =
palloc0(
sizeof(bbstreamer_lz4_frame));
81 &bbstreamer_lz4_compressor_ops;
83 streamer->base.bbs_next =
next;
85 streamer->header_written =
false;
88 prefs = &streamer->prefs;
89 memset(prefs, 0,
sizeof(LZ4F_preferences_t));
90 prefs->frameInfo.blockSizeID = LZ4F_max256KB;
92 prefs->compressionLevel = compress->
level;
94 ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
95 if (LZ4F_isError(ctxError))
96 pg_log_error(
"could not create lz4 compression context: %s",
97 LZ4F_getErrorName(ctxError));
99 return &streamer->base;
101 pg_fatal(
"this build does not support lz4 compression");
117 bbstreamer_lz4_compressor_content(
bbstreamer *streamer,
122 bbstreamer_lz4_frame *mystreamer;
129 mystreamer = (bbstreamer_lz4_frame *) streamer;
133 if (!mystreamer->header_written)
135 compressed_size = LZ4F_compressBegin(mystreamer->cctx,
136 (
uint8 *) mystreamer->base.bbs_buffer.data,
137 mystreamer->base.bbs_buffer.maxlen,
140 if (LZ4F_isError(compressed_size))
142 LZ4F_getErrorName(compressed_size));
144 mystreamer->bytes_written += compressed_size;
145 mystreamer->header_written =
true;
152 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
153 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
160 out_bound = LZ4F_compressBound(
len, &mystreamer->prefs);
161 if (avail_out < out_bound)
164 mystreamer->base.bbs_buffer.data,
165 mystreamer->bytes_written,
169 if (mystreamer->base.bbs_buffer.maxlen < out_bound)
172 avail_out = mystreamer->base.bbs_buffer.maxlen;
173 mystreamer->bytes_written = 0;
174 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
185 compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
189 if (LZ4F_isError(compressed_size))
191 LZ4F_getErrorName(compressed_size));
193 mystreamer->bytes_written += compressed_size;
200 bbstreamer_lz4_compressor_finalize(
bbstreamer *streamer)
202 bbstreamer_lz4_frame *mystreamer;
208 mystreamer = (bbstreamer_lz4_frame *) streamer;
211 footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
212 if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
216 mystreamer->base.bbs_buffer.data,
217 mystreamer->bytes_written,
221 if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
224 avail_out = mystreamer->base.bbs_buffer.maxlen;
225 mystreamer->bytes_written = 0;
226 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
230 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
231 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
238 compressed_size = LZ4F_compressEnd(mystreamer->cctx,
239 next_out, avail_out, NULL);
241 if (LZ4F_isError(compressed_size))
243 LZ4F_getErrorName(compressed_size));
245 mystreamer->bytes_written += compressed_size;
248 mystreamer->base.bbs_buffer.data,
249 mystreamer->bytes_written,
259 bbstreamer_lz4_compressor_free(
bbstreamer *streamer)
261 bbstreamer_lz4_frame *mystreamer;
263 mystreamer = (bbstreamer_lz4_frame *) streamer;
265 LZ4F_freeCompressionContext(mystreamer->cctx);
279 bbstreamer_lz4_frame *streamer;
280 LZ4F_errorCode_t ctxError;
284 streamer =
palloc0(
sizeof(bbstreamer_lz4_frame));
286 &bbstreamer_lz4_decompressor_ops;
288 streamer->base.bbs_next =
next;
292 ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
293 if (LZ4F_isError(ctxError))
294 pg_fatal(
"could not initialize compression library: %s",
295 LZ4F_getErrorName(ctxError));
297 return &streamer->base;
299 pg_fatal(
"this build does not support lz4 compression");
311 bbstreamer_lz4_decompressor_content(
bbstreamer *streamer,
316 bbstreamer_lz4_frame *mystreamer;
322 mystreamer = (bbstreamer_lz4_frame *) streamer;
324 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
326 avail_out = mystreamer->base.bbs_buffer.maxlen;
334 read_size = avail_in;
335 out_size = avail_out;
350 ret = LZ4F_decompress(mystreamer->dctx,
352 next_in, &read_size, NULL);
354 if (LZ4F_isError(ret))
356 LZ4F_getErrorName(ret));
359 avail_in -= read_size;
360 next_in += read_size;
362 mystreamer->bytes_written += out_size;
368 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
371 mystreamer->base.bbs_buffer.data,
372 mystreamer->base.bbs_buffer.maxlen,
375 avail_out = mystreamer->base.bbs_buffer.maxlen;
376 mystreamer->bytes_written = 0;
377 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
381 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
382 next_out += mystreamer->bytes_written;
391 bbstreamer_lz4_decompressor_finalize(
bbstreamer *streamer)
393 bbstreamer_lz4_frame *mystreamer;
395 mystreamer = (bbstreamer_lz4_frame *) streamer;
402 mystreamer->base.bbs_buffer.data,
403 mystreamer->base.bbs_buffer.maxlen,
413 bbstreamer_lz4_decompressor_free(
bbstreamer *streamer)
415 bbstreamer_lz4_frame *mystreamer;
417 mystreamer = (bbstreamer_lz4_frame *) streamer;
419 LZ4F_freeDecompressionContext(mystreamer->dctx);
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
static void bbstreamer_finalize(bbstreamer *streamer)
bbstreamer_archive_context
static void bbstreamer_free(bbstreamer *streamer)
bbstreamer * bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_lz4_decompressor_new(bbstreamer *next)
#define PG_COMPRESSION_OPTION_LEVEL
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
void pfree(void *pointer)
void * palloc0(Size size)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
StringInfoData bbs_buffer