PostgreSQL Source Code git master
astreamer_lz4.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * astreamer_lz4.c
4 *
5 * Archive streamers that deal with data compressed using lz4.
6 * astreamer_lz4_compressor applies lz4 compression to the input stream,
7 * and astreamer_lz4_decompressor does the reverse.
8 *
9 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
10 *
11 * IDENTIFICATION
12 * src/fe_utils/astreamer_lz4.c
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres_fe.h"
17
18#include <unistd.h>
19
20#ifdef USE_LZ4
21#include <lz4frame.h>
22#endif
23
24#include "common/logging.h"
25#include "fe_utils/astreamer.h"
26
27#ifdef USE_LZ4
28typedef struct astreamer_lz4_frame
29{
30 astreamer base;
31
32 LZ4F_compressionContext_t cctx;
33 LZ4F_decompressionContext_t dctx;
34 LZ4F_preferences_t prefs;
35
36 size_t bytes_written;
37 bool header_written;
38} astreamer_lz4_frame;
39
40static void astreamer_lz4_compressor_content(astreamer *streamer,
41 astreamer_member *member,
42 const char *data, int len,
44static void astreamer_lz4_compressor_finalize(astreamer *streamer);
45static void astreamer_lz4_compressor_free(astreamer *streamer);
46
47static const astreamer_ops astreamer_lz4_compressor_ops = {
48 .content = astreamer_lz4_compressor_content,
49 .finalize = astreamer_lz4_compressor_finalize,
50 .free = astreamer_lz4_compressor_free
51};
52
53static void astreamer_lz4_decompressor_content(astreamer *streamer,
54 astreamer_member *member,
55 const char *data, int len,
57static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
58static void astreamer_lz4_decompressor_free(astreamer *streamer);
59
60static const astreamer_ops astreamer_lz4_decompressor_ops = {
61 .content = astreamer_lz4_decompressor_content,
62 .finalize = astreamer_lz4_decompressor_finalize,
63 .free = astreamer_lz4_decompressor_free
64};
65#endif
66
67/*
68 * Create a new base backup streamer that performs lz4 compression of tar
69 * blocks.
70 */
73{
74#ifdef USE_LZ4
75 astreamer_lz4_frame *streamer;
76 LZ4F_errorCode_t ctxError;
77 LZ4F_preferences_t *prefs;
78
79 Assert(next != NULL);
80
81 streamer = palloc0(sizeof(astreamer_lz4_frame));
82 *((const astreamer_ops **) &streamer->base.bbs_ops) =
83 &astreamer_lz4_compressor_ops;
84
85 streamer->base.bbs_next = next;
86 initStringInfo(&streamer->base.bbs_buffer);
87 streamer->header_written = false;
88
89 /* Initialize stream compression preferences */
90 prefs = &streamer->prefs;
91 memset(prefs, 0, sizeof(LZ4F_preferences_t));
92 prefs->frameInfo.blockSizeID = LZ4F_max256KB;
93 prefs->compressionLevel = compress->level;
94
95 ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
96 if (LZ4F_isError(ctxError))
97 pg_log_error("could not create lz4 compression context: %s",
98 LZ4F_getErrorName(ctxError));
99
100 return &streamer->base;
101#else
102 pg_fatal("this build does not support compression with %s", "LZ4");
103 return NULL; /* keep compiler quiet */
104#endif
105}
106
107#ifdef USE_LZ4
108/*
109 * Compress the input data to output buffer.
110 *
111 * Find out the compression bound based on input data length for each
112 * invocation to make sure that output buffer has enough capacity to
113 * accommodate the compressed data. In case if the output buffer
114 * capacity falls short of compression bound then forward the content
115 * of output buffer to next streamer and empty the buffer.
116 */
117static void
118astreamer_lz4_compressor_content(astreamer *streamer,
119 astreamer_member *member,
120 const char *data, int len,
122{
123 astreamer_lz4_frame *mystreamer;
124 uint8 *next_in,
125 *next_out;
126 size_t out_bound,
127 compressed_size,
128 avail_out;
129
130 mystreamer = (astreamer_lz4_frame *) streamer;
131 next_in = (uint8 *) data;
132
133 /* Write header before processing the first input chunk. */
134 if (!mystreamer->header_written)
135 {
136 compressed_size = LZ4F_compressBegin(mystreamer->cctx,
137 (uint8 *) mystreamer->base.bbs_buffer.data,
138 mystreamer->base.bbs_buffer.maxlen,
139 &mystreamer->prefs);
140
141 if (LZ4F_isError(compressed_size))
142 pg_log_error("could not write lz4 header: %s",
143 LZ4F_getErrorName(compressed_size));
144
145 mystreamer->bytes_written += compressed_size;
146 mystreamer->header_written = true;
147 }
148
149 /*
150 * Update the offset and capacity of output buffer based on number of
151 * bytes written to output buffer.
152 */
153 next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
154 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
155
156 /*
157 * Find out the compression bound and make sure that output buffer has the
158 * required capacity for the success of LZ4F_compressUpdate. If needed
159 * forward the content to next streamer and empty the buffer.
160 */
161 out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
162 if (avail_out < out_bound)
163 {
164 astreamer_content(mystreamer->base.bbs_next, member,
165 mystreamer->base.bbs_buffer.data,
166 mystreamer->bytes_written,
167 context);
168
169 /* Enlarge buffer if it falls short of out bound. */
170 if (mystreamer->base.bbs_buffer.maxlen < out_bound)
171 enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
172
173 avail_out = mystreamer->base.bbs_buffer.maxlen;
174 mystreamer->bytes_written = 0;
175 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
176 }
177
178 /*
179 * This call compresses the data starting at next_in and generates the
180 * output starting at next_out. It expects the caller to provide the size
181 * of input buffer and capacity of output buffer by providing parameters
182 * len and avail_out.
183 *
184 * It returns the number of bytes compressed to output buffer.
185 */
186 compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
187 next_out, avail_out,
188 next_in, len, NULL);
189
190 if (LZ4F_isError(compressed_size))
191 pg_log_error("could not compress data: %s",
192 LZ4F_getErrorName(compressed_size));
193
194 mystreamer->bytes_written += compressed_size;
195}
196
197/*
198 * End-of-stream processing.
199 */
200static void
201astreamer_lz4_compressor_finalize(astreamer *streamer)
202{
203 astreamer_lz4_frame *mystreamer;
204 uint8 *next_out;
205 size_t footer_bound,
206 compressed_size,
207 avail_out;
208
209 mystreamer = (astreamer_lz4_frame *) streamer;
210
211 /* Find out the footer bound and update the output buffer. */
212 footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
213 if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
214 footer_bound)
215 {
216 astreamer_content(mystreamer->base.bbs_next, NULL,
217 mystreamer->base.bbs_buffer.data,
218 mystreamer->bytes_written,
220
221 /* Enlarge buffer if it falls short of footer bound. */
222 if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
223 enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
224
225 avail_out = mystreamer->base.bbs_buffer.maxlen;
226 mystreamer->bytes_written = 0;
227 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
228 }
229 else
230 {
231 next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
232 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
233 }
234
235 /*
236 * Finalize the frame and flush whatever data remaining in compression
237 * context.
238 */
239 compressed_size = LZ4F_compressEnd(mystreamer->cctx,
240 next_out, avail_out, NULL);
241
242 if (LZ4F_isError(compressed_size))
243 pg_log_error("could not end lz4 compression: %s",
244 LZ4F_getErrorName(compressed_size));
245
246 mystreamer->bytes_written += compressed_size;
247
248 astreamer_content(mystreamer->base.bbs_next, NULL,
249 mystreamer->base.bbs_buffer.data,
250 mystreamer->bytes_written,
252
253 astreamer_finalize(mystreamer->base.bbs_next);
254}
255
256/*
257 * Free memory.
258 */
259static void
260astreamer_lz4_compressor_free(astreamer *streamer)
261{
262 astreamer_lz4_frame *mystreamer;
263
264 mystreamer = (astreamer_lz4_frame *) streamer;
265 astreamer_free(streamer->bbs_next);
266 LZ4F_freeCompressionContext(mystreamer->cctx);
267 pfree(streamer->bbs_buffer.data);
268 pfree(streamer);
269}
270#endif
271
272/*
273 * Create a new base backup streamer that performs decompression of lz4
274 * compressed blocks.
275 */
276astreamer *
278{
279#ifdef USE_LZ4
280 astreamer_lz4_frame *streamer;
281 LZ4F_errorCode_t ctxError;
282
283 Assert(next != NULL);
284
285 streamer = palloc0(sizeof(astreamer_lz4_frame));
286 *((const astreamer_ops **) &streamer->base.bbs_ops) =
287 &astreamer_lz4_decompressor_ops;
288
289 streamer->base.bbs_next = next;
290 initStringInfo(&streamer->base.bbs_buffer);
291
292 /* Initialize internal stream state for decompression */
293 ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
294 if (LZ4F_isError(ctxError))
295 pg_fatal("could not initialize compression library: %s",
296 LZ4F_getErrorName(ctxError));
297
298 return &streamer->base;
299#else
300 pg_fatal("this build does not support compression with %s", "LZ4");
301 return NULL; /* keep compiler quiet */
302#endif
303}
304
305#ifdef USE_LZ4
306/*
307 * Decompress the input data to output buffer until we run out of input
308 * data. Each time the output buffer is full, pass on the decompressed data
309 * to the next streamer.
310 */
311static void
312astreamer_lz4_decompressor_content(astreamer *streamer,
313 astreamer_member *member,
314 const char *data, int len,
316{
317 astreamer_lz4_frame *mystreamer;
318 uint8 *next_in,
319 *next_out;
320 size_t avail_in,
321 avail_out;
322
323 mystreamer = (astreamer_lz4_frame *) streamer;
324 next_in = (uint8 *) data;
325 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
326 avail_in = len;
327 avail_out = mystreamer->base.bbs_buffer.maxlen;
328
329 while (avail_in > 0)
330 {
331 size_t ret,
332 read_size,
333 out_size;
334
335 read_size = avail_in;
336 out_size = avail_out;
337
338 /*
339 * This call decompresses the data starting at next_in and generates
340 * the output data starting at next_out. It expects the caller to
341 * provide size of the input buffer and total capacity of the output
342 * buffer by providing the read_size and out_size parameters
343 * respectively.
344 *
345 * Per the documentation of LZ4, parameters read_size and out_size
346 * behaves as dual parameters. On return, the number of bytes consumed
347 * from the input buffer will be written back to read_size and the
348 * number of bytes decompressed to output buffer will be written back
349 * to out_size respectively.
350 */
351 ret = LZ4F_decompress(mystreamer->dctx,
352 next_out, &out_size,
353 next_in, &read_size, NULL);
354
355 if (LZ4F_isError(ret))
356 pg_log_error("could not decompress data: %s",
357 LZ4F_getErrorName(ret));
358
359 /* Update input buffer based on number of bytes consumed */
360 avail_in -= read_size;
361 next_in += read_size;
362
363 mystreamer->bytes_written += out_size;
364
365 /*
366 * If output buffer is full then forward the content to next streamer
367 * and update the output buffer.
368 */
369 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
370 {
371 astreamer_content(mystreamer->base.bbs_next, member,
372 mystreamer->base.bbs_buffer.data,
373 mystreamer->base.bbs_buffer.maxlen,
374 context);
375
376 avail_out = mystreamer->base.bbs_buffer.maxlen;
377 mystreamer->bytes_written = 0;
378 next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
379 }
380 else
381 {
382 avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
383 next_out += mystreamer->bytes_written;
384 }
385 }
386}
387
388/*
389 * End-of-stream processing.
390 */
391static void
392astreamer_lz4_decompressor_finalize(astreamer *streamer)
393{
394 astreamer_lz4_frame *mystreamer;
395
396 mystreamer = (astreamer_lz4_frame *) streamer;
397
398 /*
399 * End of the stream, if there is some pending data in output buffers then
400 * we must forward it to next streamer.
401 */
402 astreamer_content(mystreamer->base.bbs_next, NULL,
403 mystreamer->base.bbs_buffer.data,
404 mystreamer->base.bbs_buffer.maxlen,
406
407 astreamer_finalize(mystreamer->base.bbs_next);
408}
409
410/*
411 * Free memory.
412 */
413static void
414astreamer_lz4_decompressor_free(astreamer *streamer)
415{
416 astreamer_lz4_frame *mystreamer;
417
418 mystreamer = (astreamer_lz4_frame *) streamer;
419 astreamer_free(streamer->bbs_next);
420 LZ4F_freeDecompressionContext(mystreamer->dctx);
421 pfree(streamer->bbs_buffer.data);
422 pfree(streamer);
423}
424#endif
static void astreamer_free(astreamer *streamer)
Definition: astreamer.h:153
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:135
static void astreamer_finalize(astreamer *streamer)
Definition: astreamer.h:145
astreamer_archive_context
Definition: astreamer.h:63
@ ASTREAMER_UNKNOWN
Definition: astreamer.h:64
astreamer * astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
Definition: astreamer_lz4.c:72
astreamer * astreamer_lz4_decompressor_new(astreamer *next)
static int32 next
Definition: blutils.c:224
uint8_t uint8
Definition: c.h:500
Assert(PointerIsAligned(start, uint64))
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1524
void * palloc0(Size size)
Definition: mcxt.c:1347
#define pg_fatal(...)
const void size_t len
const void * data
while(p+4<=pend)
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:337
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:126
StringInfoData bbs_buffer
Definition: astreamer.h:111
astreamer * bbs_next
Definition: astreamer.h:110