PostgreSQL Source Code  git master
astreamer_lz4.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * astreamer_lz4.c
4  *
5  * Archive streamers that deal with data compressed using lz4.
6  * astreamer_lz4_compressor applies lz4 compression to the input stream,
7  * and astreamer_lz4_decompressor does the reverse.
8  *
9  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
10  *
11  * IDENTIFICATION
12  * src/bin/pg_basebackup/astreamer_lz4.c
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <unistd.h>
19 
20 #ifdef USE_LZ4
21 #include <lz4frame.h>
22 #endif
23 
24 #include "common/file_perm.h"
25 #include "common/logging.h"
26 #include "common/string.h"
27 #include "fe_utils/astreamer.h"
28 
29 #ifdef USE_LZ4
30 typedef struct astreamer_lz4_frame
31 {
32  astreamer base;
33 
34  LZ4F_compressionContext_t cctx;
35  LZ4F_decompressionContext_t dctx;
36  LZ4F_preferences_t prefs;
37 
38  size_t bytes_written;
39  bool header_written;
40 } astreamer_lz4_frame;
41 
42 static void astreamer_lz4_compressor_content(astreamer *streamer,
43  astreamer_member *member,
44  const char *data, int len,
46 static void astreamer_lz4_compressor_finalize(astreamer *streamer);
47 static void astreamer_lz4_compressor_free(astreamer *streamer);
48 
49 static const astreamer_ops astreamer_lz4_compressor_ops = {
50  .content = astreamer_lz4_compressor_content,
51  .finalize = astreamer_lz4_compressor_finalize,
52  .free = astreamer_lz4_compressor_free
53 };
54 
55 static void astreamer_lz4_decompressor_content(astreamer *streamer,
56  astreamer_member *member,
57  const char *data, int len,
59 static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
60 static void astreamer_lz4_decompressor_free(astreamer *streamer);
61 
62 static const astreamer_ops astreamer_lz4_decompressor_ops = {
63  .content = astreamer_lz4_decompressor_content,
64  .finalize = astreamer_lz4_decompressor_finalize,
65  .free = astreamer_lz4_decompressor_free
66 };
67 #endif
68 
69 /*
70  * Create a new base backup streamer that performs lz4 compression of tar
71  * blocks.
72  */
73 astreamer *
75 {
76 #ifdef USE_LZ4
77  astreamer_lz4_frame *streamer;
78  LZ4F_errorCode_t ctxError;
79  LZ4F_preferences_t *prefs;
80 
81  Assert(next != NULL);
82 
83  streamer = palloc0(sizeof(astreamer_lz4_frame));
84  *((const astreamer_ops **) &streamer->base.bbs_ops) =
85  &astreamer_lz4_compressor_ops;
86 
87  streamer->base.bbs_next = next;
88  initStringInfo(&streamer->base.bbs_buffer);
89  streamer->header_written = false;
90 
91  /* Initialize stream compression preferences */
92  prefs = &streamer->prefs;
93  memset(prefs, 0, sizeof(LZ4F_preferences_t));
94  prefs->frameInfo.blockSizeID = LZ4F_max256KB;
95  prefs->compressionLevel = compress->level;
96 
97  ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
98  if (LZ4F_isError(ctxError))
99  pg_log_error("could not create lz4 compression context: %s",
100  LZ4F_getErrorName(ctxError));
101 
102  return &streamer->base;
103 #else
104  pg_fatal("this build does not support compression with %s", "LZ4");
105  return NULL; /* keep compiler quiet */
106 #endif
107 }
108 
109 #ifdef USE_LZ4
110 /*
111  * Compress the input data to output buffer.
112  *
113  * Find out the compression bound based on input data length for each
114  * invocation to make sure that output buffer has enough capacity to
115  * accommodate the compressed data. In case if the output buffer
116  * capacity falls short of compression bound then forward the content
117  * of output buffer to next streamer and empty the buffer.
118  */
119 static void
120 astreamer_lz4_compressor_content(astreamer *streamer,
121  astreamer_member *member,
122  const char *data, int len,
124 {
125  astreamer_lz4_frame *mystreamer;
126  uint8 *next_in,
127  *next_out;
128  size_t out_bound,
129  compressed_size,
130  avail_out;
131 
132  mystreamer = (astreamer_lz4_frame *) streamer;
133  next_in = (uint8 *) data;
134 
135  /* Write header before processing the first input chunk. */
136  if (!mystreamer->header_written)
137  {
138  compressed_size = LZ4F_compressBegin(mystreamer->cctx,
139  (uint8 *) mystreamer->base.bbs_buffer.data,
140  mystreamer->base.bbs_buffer.maxlen,
141  &mystreamer->prefs);
142 
143  if (LZ4F_isError(compressed_size))
144  pg_log_error("could not write lz4 header: %s",
145  LZ4F_getErrorName(compressed_size));
146 
147  mystreamer->bytes_written += compressed_size;
148  mystreamer->header_written = true;
149  }
150 
151  /*
152  * Update the offset and capacity of output buffer based on number of
153  * bytes written to output buffer.
154  */
155  next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
156  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
157 
158  /*
159  * Find out the compression bound and make sure that output buffer has the
160  * required capacity for the success of LZ4F_compressUpdate. If needed
161  * forward the content to next streamer and empty the buffer.
162  */
163  out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
164  if (avail_out < out_bound)
165  {
166  astreamer_content(mystreamer->base.bbs_next, member,
167  mystreamer->base.bbs_buffer.data,
168  mystreamer->bytes_written,
169  context);
170 
171  /* Enlarge buffer if it falls short of out bound. */
172  if (mystreamer->base.bbs_buffer.maxlen < out_bound)
173  enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
174 
175  avail_out = mystreamer->base.bbs_buffer.maxlen;
176  mystreamer->bytes_written = 0;
177  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
178  }
179 
180  /*
181  * This call compresses the data starting at next_in and generates the
182  * output starting at next_out. It expects the caller to provide the size
183  * of input buffer and capacity of output buffer by providing parameters
184  * len and avail_out.
185  *
186  * It returns the number of bytes compressed to output buffer.
187  */
188  compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
189  next_out, avail_out,
190  next_in, len, NULL);
191 
192  if (LZ4F_isError(compressed_size))
193  pg_log_error("could not compress data: %s",
194  LZ4F_getErrorName(compressed_size));
195 
196  mystreamer->bytes_written += compressed_size;
197 }
198 
199 /*
200  * End-of-stream processing.
201  */
202 static void
203 astreamer_lz4_compressor_finalize(astreamer *streamer)
204 {
205  astreamer_lz4_frame *mystreamer;
206  uint8 *next_out;
207  size_t footer_bound,
208  compressed_size,
209  avail_out;
210 
211  mystreamer = (astreamer_lz4_frame *) streamer;
212 
213  /* Find out the footer bound and update the output buffer. */
214  footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
215  if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
216  footer_bound)
217  {
218  astreamer_content(mystreamer->base.bbs_next, NULL,
219  mystreamer->base.bbs_buffer.data,
220  mystreamer->bytes_written,
222 
223  /* Enlarge buffer if it falls short of footer bound. */
224  if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
225  enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
226 
227  avail_out = mystreamer->base.bbs_buffer.maxlen;
228  mystreamer->bytes_written = 0;
229  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
230  }
231  else
232  {
233  next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
234  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
235  }
236 
237  /*
238  * Finalize the frame and flush whatever data remaining in compression
239  * context.
240  */
241  compressed_size = LZ4F_compressEnd(mystreamer->cctx,
242  next_out, avail_out, NULL);
243 
244  if (LZ4F_isError(compressed_size))
245  pg_log_error("could not end lz4 compression: %s",
246  LZ4F_getErrorName(compressed_size));
247 
248  mystreamer->bytes_written += compressed_size;
249 
250  astreamer_content(mystreamer->base.bbs_next, NULL,
251  mystreamer->base.bbs_buffer.data,
252  mystreamer->bytes_written,
254 
255  astreamer_finalize(mystreamer->base.bbs_next);
256 }
257 
258 /*
259  * Free memory.
260  */
261 static void
262 astreamer_lz4_compressor_free(astreamer *streamer)
263 {
264  astreamer_lz4_frame *mystreamer;
265 
266  mystreamer = (astreamer_lz4_frame *) streamer;
267  astreamer_free(streamer->bbs_next);
268  LZ4F_freeCompressionContext(mystreamer->cctx);
269  pfree(streamer->bbs_buffer.data);
270  pfree(streamer);
271 }
272 #endif
273 
274 /*
275  * Create a new base backup streamer that performs decompression of lz4
276  * compressed blocks.
277  */
278 astreamer *
280 {
281 #ifdef USE_LZ4
282  astreamer_lz4_frame *streamer;
283  LZ4F_errorCode_t ctxError;
284 
285  Assert(next != NULL);
286 
287  streamer = palloc0(sizeof(astreamer_lz4_frame));
288  *((const astreamer_ops **) &streamer->base.bbs_ops) =
289  &astreamer_lz4_decompressor_ops;
290 
291  streamer->base.bbs_next = next;
292  initStringInfo(&streamer->base.bbs_buffer);
293 
294  /* Initialize internal stream state for decompression */
295  ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
296  if (LZ4F_isError(ctxError))
297  pg_fatal("could not initialize compression library: %s",
298  LZ4F_getErrorName(ctxError));
299 
300  return &streamer->base;
301 #else
302  pg_fatal("this build does not support compression with %s", "LZ4");
303  return NULL; /* keep compiler quiet */
304 #endif
305 }
306 
307 #ifdef USE_LZ4
308 /*
309  * Decompress the input data to output buffer until we run out of input
310  * data. Each time the output buffer is full, pass on the decompressed data
311  * to the next streamer.
312  */
313 static void
314 astreamer_lz4_decompressor_content(astreamer *streamer,
315  astreamer_member *member,
316  const char *data, int len,
318 {
319  astreamer_lz4_frame *mystreamer;
320  uint8 *next_in,
321  *next_out;
322  size_t avail_in,
323  avail_out;
324 
325  mystreamer = (astreamer_lz4_frame *) streamer;
326  next_in = (uint8 *) data;
327  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
328  avail_in = len;
329  avail_out = mystreamer->base.bbs_buffer.maxlen;
330 
331  while (avail_in > 0)
332  {
333  size_t ret,
334  read_size,
335  out_size;
336 
337  read_size = avail_in;
338  out_size = avail_out;
339 
340  /*
341  * This call decompresses the data starting at next_in and generates
342  * the output data starting at next_out. It expects the caller to
343  * provide size of the input buffer and total capacity of the output
344  * buffer by providing the read_size and out_size parameters
345  * respectively.
346  *
347  * Per the documentation of LZ4, parameters read_size and out_size
348  * behaves as dual parameters. On return, the number of bytes consumed
349  * from the input buffer will be written back to read_size and the
350  * number of bytes decompressed to output buffer will be written back
351  * to out_size respectively.
352  */
353  ret = LZ4F_decompress(mystreamer->dctx,
354  next_out, &out_size,
355  next_in, &read_size, NULL);
356 
357  if (LZ4F_isError(ret))
358  pg_log_error("could not decompress data: %s",
359  LZ4F_getErrorName(ret));
360 
361  /* Update input buffer based on number of bytes consumed */
362  avail_in -= read_size;
363  next_in += read_size;
364 
365  mystreamer->bytes_written += out_size;
366 
367  /*
368  * If output buffer is full then forward the content to next streamer
369  * and update the output buffer.
370  */
371  if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
372  {
373  astreamer_content(mystreamer->base.bbs_next, member,
374  mystreamer->base.bbs_buffer.data,
375  mystreamer->base.bbs_buffer.maxlen,
376  context);
377 
378  avail_out = mystreamer->base.bbs_buffer.maxlen;
379  mystreamer->bytes_written = 0;
380  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
381  }
382  else
383  {
384  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
385  next_out += mystreamer->bytes_written;
386  }
387  }
388 }
389 
390 /*
391  * End-of-stream processing.
392  */
393 static void
394 astreamer_lz4_decompressor_finalize(astreamer *streamer)
395 {
396  astreamer_lz4_frame *mystreamer;
397 
398  mystreamer = (astreamer_lz4_frame *) streamer;
399 
400  /*
401  * End of the stream, if there is some pending data in output buffers then
402  * we must forward it to next streamer.
403  */
404  astreamer_content(mystreamer->base.bbs_next, NULL,
405  mystreamer->base.bbs_buffer.data,
406  mystreamer->base.bbs_buffer.maxlen,
408 
409  astreamer_finalize(mystreamer->base.bbs_next);
410 }
411 
412 /*
413  * Free memory.
414  */
415 static void
416 astreamer_lz4_decompressor_free(astreamer *streamer)
417 {
418  astreamer_lz4_frame *mystreamer;
419 
420  mystreamer = (astreamer_lz4_frame *) streamer;
421  astreamer_free(streamer->bbs_next);
422  LZ4F_freeDecompressionContext(mystreamer->dctx);
423  pfree(streamer->bbs_buffer.data);
424  pfree(streamer);
425 }
426 #endif
static void astreamer_free(astreamer *streamer)
Definition: astreamer.h:153
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:135
static void astreamer_finalize(astreamer *streamer)
Definition: astreamer.h:145
astreamer_archive_context
Definition: astreamer.h:63
@ ASTREAMER_UNKNOWN
Definition: astreamer.h:64
astreamer * astreamer_lz4_decompressor_new(astreamer *next)
astreamer * astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
Definition: astreamer_lz4.c:74
static int32 next
Definition: blutils.c:222
#define Assert(condition)
Definition: c.h:849
unsigned char uint8
Definition: c.h:504
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
#define pg_fatal(...)
const void size_t len
const void * data
while(p+4<=pend)
tree context
Definition: radixtree.h:1835
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:289
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:126
StringInfoData bbs_buffer
Definition: astreamer.h:111
astreamer * bbs_next
Definition: astreamer.h:110