30 typedef struct astreamer_lz4_frame
34 LZ4F_compressionContext_t cctx;
35 LZ4F_decompressionContext_t dctx;
36 LZ4F_preferences_t prefs;
40 } astreamer_lz4_frame;
42 static void astreamer_lz4_compressor_content(
astreamer *streamer,
46 static void astreamer_lz4_compressor_finalize(
astreamer *streamer);
47 static void astreamer_lz4_compressor_free(
astreamer *streamer);
50 .
content = astreamer_lz4_compressor_content,
51 .finalize = astreamer_lz4_compressor_finalize,
52 .free = astreamer_lz4_compressor_free
55 static void astreamer_lz4_decompressor_content(
astreamer *streamer,
59 static void astreamer_lz4_decompressor_finalize(
astreamer *streamer);
60 static void astreamer_lz4_decompressor_free(
astreamer *streamer);
63 .
content = astreamer_lz4_decompressor_content,
64 .finalize = astreamer_lz4_decompressor_finalize,
65 .free = astreamer_lz4_decompressor_free
77 astreamer_lz4_frame *streamer;
78 LZ4F_errorCode_t ctxError;
79 LZ4F_preferences_t *prefs;
83 streamer =
palloc0(
sizeof(astreamer_lz4_frame));
85 &astreamer_lz4_compressor_ops;
87 streamer->base.bbs_next =
next;
89 streamer->header_written =
false;
92 prefs = &streamer->prefs;
93 memset(prefs, 0,
sizeof(LZ4F_preferences_t));
94 prefs->frameInfo.blockSizeID = LZ4F_max256KB;
95 prefs->compressionLevel = compress->
level;
97 ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
98 if (LZ4F_isError(ctxError))
99 pg_log_error(
"could not create lz4 compression context: %s",
100 LZ4F_getErrorName(ctxError));
102 return &streamer->base;
104 pg_fatal(
"this build does not support compression with %s",
"LZ4");
120 astreamer_lz4_compressor_content(
astreamer *streamer,
125 astreamer_lz4_frame *mystreamer;
132 mystreamer = (astreamer_lz4_frame *) streamer;
136 if (!mystreamer->header_written)
138 compressed_size = LZ4F_compressBegin(mystreamer->cctx,
139 (
uint8 *) mystreamer->base.bbs_buffer.data,
140 mystreamer->base.bbs_buffer.maxlen,
143 if (LZ4F_isError(compressed_size))
145 LZ4F_getErrorName(compressed_size));
147 mystreamer->bytes_written += compressed_size;
148 mystreamer->header_written =
true;
155 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
156 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
163 out_bound = LZ4F_compressBound(
len, &mystreamer->prefs);
164 if (avail_out < out_bound)
167 mystreamer->base.bbs_buffer.data,
168 mystreamer->bytes_written,
172 if (mystreamer->base.bbs_buffer.maxlen < out_bound)
175 avail_out = mystreamer->base.bbs_buffer.maxlen;
176 mystreamer->bytes_written = 0;
177 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
188 compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
192 if (LZ4F_isError(compressed_size))
194 LZ4F_getErrorName(compressed_size));
196 mystreamer->bytes_written += compressed_size;
203 astreamer_lz4_compressor_finalize(
astreamer *streamer)
205 astreamer_lz4_frame *mystreamer;
211 mystreamer = (astreamer_lz4_frame *) streamer;
214 footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
215 if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
219 mystreamer->base.bbs_buffer.data,
220 mystreamer->bytes_written,
224 if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
227 avail_out = mystreamer->base.bbs_buffer.maxlen;
228 mystreamer->bytes_written = 0;
229 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
233 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
234 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
241 compressed_size = LZ4F_compressEnd(mystreamer->cctx,
242 next_out, avail_out, NULL);
244 if (LZ4F_isError(compressed_size))
246 LZ4F_getErrorName(compressed_size));
248 mystreamer->bytes_written += compressed_size;
251 mystreamer->base.bbs_buffer.data,
252 mystreamer->bytes_written,
262 astreamer_lz4_compressor_free(
astreamer *streamer)
264 astreamer_lz4_frame *mystreamer;
266 mystreamer = (astreamer_lz4_frame *) streamer;
268 LZ4F_freeCompressionContext(mystreamer->cctx);
282 astreamer_lz4_frame *streamer;
283 LZ4F_errorCode_t ctxError;
287 streamer =
palloc0(
sizeof(astreamer_lz4_frame));
289 &astreamer_lz4_decompressor_ops;
291 streamer->base.bbs_next =
next;
295 ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
296 if (LZ4F_isError(ctxError))
297 pg_fatal(
"could not initialize compression library: %s",
298 LZ4F_getErrorName(ctxError));
300 return &streamer->base;
302 pg_fatal(
"this build does not support compression with %s",
"LZ4");
314 astreamer_lz4_decompressor_content(
astreamer *streamer,
319 astreamer_lz4_frame *mystreamer;
325 mystreamer = (astreamer_lz4_frame *) streamer;
327 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
329 avail_out = mystreamer->base.bbs_buffer.maxlen;
337 read_size = avail_in;
338 out_size = avail_out;
353 ret = LZ4F_decompress(mystreamer->dctx,
355 next_in, &read_size, NULL);
357 if (LZ4F_isError(ret))
359 LZ4F_getErrorName(ret));
362 avail_in -= read_size;
363 next_in += read_size;
365 mystreamer->bytes_written += out_size;
371 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
374 mystreamer->base.bbs_buffer.data,
375 mystreamer->base.bbs_buffer.maxlen,
378 avail_out = mystreamer->base.bbs_buffer.maxlen;
379 mystreamer->bytes_written = 0;
380 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
384 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
385 next_out += mystreamer->bytes_written;
394 astreamer_lz4_decompressor_finalize(
astreamer *streamer)
396 astreamer_lz4_frame *mystreamer;
398 mystreamer = (astreamer_lz4_frame *) streamer;
405 mystreamer->base.bbs_buffer.data,
406 mystreamer->base.bbs_buffer.maxlen,
416 astreamer_lz4_decompressor_free(
astreamer *streamer)
418 astreamer_lz4_frame *mystreamer;
420 mystreamer = (astreamer_lz4_frame *) streamer;
422 LZ4F_freeDecompressionContext(mystreamer->dctx);
static void astreamer_free(astreamer *streamer)
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
static void astreamer_finalize(astreamer *streamer)
astreamer_archive_context
astreamer * astreamer_lz4_decompressor_new(astreamer *next)
astreamer * astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
#define Assert(condition)
#define pg_log_error(...)
void pfree(void *pointer)
void * palloc0(Size size)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
StringInfoData bbs_buffer