28typedef struct astreamer_lz4_frame
32 LZ4F_compressionContext_t cctx;
33 LZ4F_decompressionContext_t dctx;
34 LZ4F_preferences_t prefs;
40static void astreamer_lz4_compressor_content(
astreamer *streamer,
44static void astreamer_lz4_compressor_finalize(
astreamer *streamer);
45static void astreamer_lz4_compressor_free(
astreamer *streamer);
48 .
content = astreamer_lz4_compressor_content,
49 .finalize = astreamer_lz4_compressor_finalize,
50 .free = astreamer_lz4_compressor_free
53static void astreamer_lz4_decompressor_content(
astreamer *streamer,
57static void astreamer_lz4_decompressor_finalize(
astreamer *streamer);
58static void astreamer_lz4_decompressor_free(
astreamer *streamer);
61 .
content = astreamer_lz4_decompressor_content,
62 .finalize = astreamer_lz4_decompressor_finalize,
63 .free = astreamer_lz4_decompressor_free
75 astreamer_lz4_frame *streamer;
76 LZ4F_errorCode_t ctxError;
77 LZ4F_preferences_t *prefs;
81 streamer =
palloc0(
sizeof(astreamer_lz4_frame));
83 &astreamer_lz4_compressor_ops;
85 streamer->base.bbs_next =
next;
87 streamer->header_written =
false;
90 prefs = &streamer->prefs;
91 memset(prefs, 0,
sizeof(LZ4F_preferences_t));
92 prefs->frameInfo.blockSizeID = LZ4F_max256KB;
93 prefs->compressionLevel = compress->
level;
95 ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
96 if (LZ4F_isError(ctxError))
97 pg_log_error(
"could not create lz4 compression context: %s",
98 LZ4F_getErrorName(ctxError));
100 return &streamer->base;
102 pg_fatal(
"this build does not support compression with %s",
"LZ4");
118astreamer_lz4_compressor_content(
astreamer *streamer,
123 astreamer_lz4_frame *mystreamer;
130 mystreamer = (astreamer_lz4_frame *) streamer;
134 if (!mystreamer->header_written)
136 compressed_size = LZ4F_compressBegin(mystreamer->cctx,
137 (
uint8 *) mystreamer->base.bbs_buffer.data,
138 mystreamer->base.bbs_buffer.maxlen,
141 if (LZ4F_isError(compressed_size))
143 LZ4F_getErrorName(compressed_size));
145 mystreamer->bytes_written += compressed_size;
146 mystreamer->header_written =
true;
153 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
154 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
161 out_bound = LZ4F_compressBound(
len, &mystreamer->prefs);
162 if (avail_out < out_bound)
165 mystreamer->base.bbs_buffer.data,
166 mystreamer->bytes_written,
170 if (mystreamer->base.bbs_buffer.maxlen < out_bound)
173 avail_out = mystreamer->base.bbs_buffer.maxlen;
174 mystreamer->bytes_written = 0;
175 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
186 compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
190 if (LZ4F_isError(compressed_size))
192 LZ4F_getErrorName(compressed_size));
194 mystreamer->bytes_written += compressed_size;
201astreamer_lz4_compressor_finalize(
astreamer *streamer)
203 astreamer_lz4_frame *mystreamer;
209 mystreamer = (astreamer_lz4_frame *) streamer;
212 footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
213 if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
217 mystreamer->base.bbs_buffer.data,
218 mystreamer->bytes_written,
222 if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
225 avail_out = mystreamer->base.bbs_buffer.maxlen;
226 mystreamer->bytes_written = 0;
227 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
231 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
232 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
239 compressed_size = LZ4F_compressEnd(mystreamer->cctx,
240 next_out, avail_out, NULL);
242 if (LZ4F_isError(compressed_size))
244 LZ4F_getErrorName(compressed_size));
246 mystreamer->bytes_written += compressed_size;
249 mystreamer->base.bbs_buffer.data,
250 mystreamer->bytes_written,
260astreamer_lz4_compressor_free(
astreamer *streamer)
262 astreamer_lz4_frame *mystreamer;
264 mystreamer = (astreamer_lz4_frame *) streamer;
266 LZ4F_freeCompressionContext(mystreamer->cctx);
280 astreamer_lz4_frame *streamer;
281 LZ4F_errorCode_t ctxError;
285 streamer =
palloc0(
sizeof(astreamer_lz4_frame));
287 &astreamer_lz4_decompressor_ops;
289 streamer->base.bbs_next =
next;
293 ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
294 if (LZ4F_isError(ctxError))
295 pg_fatal(
"could not initialize compression library: %s",
296 LZ4F_getErrorName(ctxError));
298 return &streamer->base;
300 pg_fatal(
"this build does not support compression with %s",
"LZ4");
312astreamer_lz4_decompressor_content(
astreamer *streamer,
317 astreamer_lz4_frame *mystreamer;
323 mystreamer = (astreamer_lz4_frame *) streamer;
325 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
327 avail_out = mystreamer->base.bbs_buffer.maxlen;
335 read_size = avail_in;
336 out_size = avail_out;
351 ret = LZ4F_decompress(mystreamer->dctx,
353 next_in, &read_size, NULL);
355 if (LZ4F_isError(ret))
357 LZ4F_getErrorName(ret));
360 avail_in -= read_size;
361 next_in += read_size;
363 mystreamer->bytes_written += out_size;
369 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
372 mystreamer->base.bbs_buffer.data,
373 mystreamer->base.bbs_buffer.maxlen,
376 avail_out = mystreamer->base.bbs_buffer.maxlen;
377 mystreamer->bytes_written = 0;
378 next_out = (
uint8 *) mystreamer->base.bbs_buffer.data;
382 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
383 next_out += mystreamer->bytes_written;
392astreamer_lz4_decompressor_finalize(
astreamer *streamer)
394 astreamer_lz4_frame *mystreamer;
396 mystreamer = (astreamer_lz4_frame *) streamer;
403 mystreamer->base.bbs_buffer.data,
404 mystreamer->base.bbs_buffer.maxlen,
414astreamer_lz4_decompressor_free(
astreamer *streamer)
416 astreamer_lz4_frame *mystreamer;
418 mystreamer = (astreamer_lz4_frame *) streamer;
420 LZ4F_freeDecompressionContext(mystreamer->dctx);
static void astreamer_free(astreamer *streamer)
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
static void astreamer_finalize(astreamer *streamer)
astreamer_archive_context
astreamer * astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
astreamer * astreamer_lz4_decompressor_new(astreamer *next)
Assert(PointerIsAligned(start, uint64))
#define pg_log_error(...)
void pfree(void *pointer)
void * palloc0(Size size)
void enlargeStringInfo(StringInfo str, int needed)
void initStringInfo(StringInfo str)
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
StringInfoData bbs_buffer