PostgreSQL Source Code  git master
bbstreamer_lz4.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bbstreamer_lz4.c
4  *
5  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/bbstreamer_lz4.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 
16 #ifdef USE_LZ4
17 #include <lz4frame.h>
18 #endif
19 
20 #include "bbstreamer.h"
21 #include "common/logging.h"
22 #include "common/file_perm.h"
23 #include "common/string.h"
24 
25 #ifdef USE_LZ4
26 typedef struct bbstreamer_lz4_frame
27 {
28  bbstreamer base;
29 
30  LZ4F_compressionContext_t cctx;
31  LZ4F_decompressionContext_t dctx;
32  LZ4F_preferences_t prefs;
33 
34  size_t bytes_written;
35  bool header_written;
36 } bbstreamer_lz4_frame;
37 
38 static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
39  bbstreamer_member *member,
40  const char *data, int len,
42 static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
43 static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
44 
45 const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
46  .content = bbstreamer_lz4_compressor_content,
47  .finalize = bbstreamer_lz4_compressor_finalize,
48  .free = bbstreamer_lz4_compressor_free
49 };
50 
51 static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
52  bbstreamer_member *member,
53  const char *data, int len,
55 static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
56 static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
57 
58 const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
59  .content = bbstreamer_lz4_decompressor_content,
60  .finalize = bbstreamer_lz4_decompressor_finalize,
61  .free = bbstreamer_lz4_decompressor_free
62 };
63 #endif
64 
65 /*
66  * Create a new base backup streamer that performs lz4 compression of tar
67  * blocks.
68  */
69 bbstreamer *
71 {
72 #ifdef USE_LZ4
73  bbstreamer_lz4_frame *streamer;
74  LZ4F_errorCode_t ctxError;
75  LZ4F_preferences_t *prefs;
76 
77  Assert(next != NULL);
78 
79  streamer = palloc0(sizeof(bbstreamer_lz4_frame));
80  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
81  &bbstreamer_lz4_compressor_ops;
82 
83  streamer->base.bbs_next = next;
84  initStringInfo(&streamer->base.bbs_buffer);
85  streamer->header_written = false;
86 
87  /* Initialize stream compression preferences */
88  prefs = &streamer->prefs;
89  memset(prefs, 0, sizeof(LZ4F_preferences_t));
90  prefs->frameInfo.blockSizeID = LZ4F_max256KB;
91  if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) != 0)
92  prefs->compressionLevel = compress->level;
93 
94  ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
95  if (LZ4F_isError(ctxError))
96  pg_log_error("could not create lz4 compression context: %s",
97  LZ4F_getErrorName(ctxError));
98 
99  return &streamer->base;
100 #else
101  pg_fatal("this build does not support lz4 compression");
102  return NULL; /* keep compiler quiet */
103 #endif
104 }
105 
106 #ifdef USE_LZ4
107 /*
108  * Compress the input data to output buffer.
109  *
110  * Find out the compression bound based on input data length for each
111  * invocation to make sure that output buffer has enough capacity to
112  * accommodate the compressed data. In case if the output buffer
113  * capacity falls short of compression bound then forward the content
114  * of output buffer to next streamer and empty the buffer.
115  */
116 static void
117 bbstreamer_lz4_compressor_content(bbstreamer *streamer,
118  bbstreamer_member *member,
119  const char *data, int len,
121 {
122  bbstreamer_lz4_frame *mystreamer;
123  uint8 *next_in,
124  *next_out;
125  size_t out_bound,
126  compressed_size,
127  avail_out;
128 
129  mystreamer = (bbstreamer_lz4_frame *) streamer;
130  next_in = (uint8 *) data;
131 
132  /* Write header before processing the first input chunk. */
133  if (!mystreamer->header_written)
134  {
135  compressed_size = LZ4F_compressBegin(mystreamer->cctx,
136  (uint8 *) mystreamer->base.bbs_buffer.data,
137  mystreamer->base.bbs_buffer.maxlen,
138  &mystreamer->prefs);
139 
140  if (LZ4F_isError(compressed_size))
141  pg_log_error("could not write lz4 header: %s",
142  LZ4F_getErrorName(compressed_size));
143 
144  mystreamer->bytes_written += compressed_size;
145  mystreamer->header_written = true;
146  }
147 
148  /*
149  * Update the offset and capacity of output buffer based on number of
150  * bytes written to output buffer.
151  */
152  next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
153  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
154 
155  /*
156  * Find out the compression bound and make sure that output buffer has the
157  * required capacity for the success of LZ4F_compressUpdate. If needed
158  * forward the content to next streamer and empty the buffer.
159  */
160  out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
161  if (avail_out < out_bound)
162  {
163  bbstreamer_content(mystreamer->base.bbs_next, member,
164  mystreamer->base.bbs_buffer.data,
165  mystreamer->bytes_written,
166  context);
167 
168  /* Enlarge buffer if it falls short of out bound. */
169  if (mystreamer->base.bbs_buffer.maxlen < out_bound)
170  enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
171 
172  avail_out = mystreamer->base.bbs_buffer.maxlen;
173  mystreamer->bytes_written = 0;
174  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
175  }
176 
177  /*
178  * This call compresses the data starting at next_in and generates the
179  * output starting at next_out. It expects the caller to provide the size
180  * of input buffer and capacity of output buffer by providing parameters
181  * len and avail_out.
182  *
183  * It returns the number of bytes compressed to output buffer.
184  */
185  compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
186  next_out, avail_out,
187  next_in, len, NULL);
188 
189  if (LZ4F_isError(compressed_size))
190  pg_log_error("could not compress data: %s",
191  LZ4F_getErrorName(compressed_size));
192 
193  mystreamer->bytes_written += compressed_size;
194 }
195 
196 /*
197  * End-of-stream processing.
198  */
199 static void
200 bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
201 {
202  bbstreamer_lz4_frame *mystreamer;
203  uint8 *next_out;
204  size_t footer_bound,
205  compressed_size,
206  avail_out;
207 
208  mystreamer = (bbstreamer_lz4_frame *) streamer;
209 
210  /* Find out the footer bound and update the output buffer. */
211  footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
212  if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
213  footer_bound)
214  {
215  bbstreamer_content(mystreamer->base.bbs_next, NULL,
216  mystreamer->base.bbs_buffer.data,
217  mystreamer->bytes_written,
219 
220  /* Enlarge buffer if it falls short of footer bound. */
221  if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
222  enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
223 
224  avail_out = mystreamer->base.bbs_buffer.maxlen;
225  mystreamer->bytes_written = 0;
226  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
227  }
228  else
229  {
230  next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
231  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
232  }
233 
234  /*
235  * Finalize the frame and flush whatever data remaining in compression
236  * context.
237  */
238  compressed_size = LZ4F_compressEnd(mystreamer->cctx,
239  next_out, avail_out, NULL);
240 
241  if (LZ4F_isError(compressed_size))
242  pg_log_error("could not end lz4 compression: %s",
243  LZ4F_getErrorName(compressed_size));
244 
245  mystreamer->bytes_written += compressed_size;
246 
247  bbstreamer_content(mystreamer->base.bbs_next, NULL,
248  mystreamer->base.bbs_buffer.data,
249  mystreamer->bytes_written,
251 
252  bbstreamer_finalize(mystreamer->base.bbs_next);
253 }
254 
255 /*
256  * Free memory.
257  */
258 static void
259 bbstreamer_lz4_compressor_free(bbstreamer *streamer)
260 {
261  bbstreamer_lz4_frame *mystreamer;
262 
263  mystreamer = (bbstreamer_lz4_frame *) streamer;
264  bbstreamer_free(streamer->bbs_next);
265  LZ4F_freeCompressionContext(mystreamer->cctx);
266  pfree(streamer->bbs_buffer.data);
267  pfree(streamer);
268 }
269 #endif
270 
271 /*
272  * Create a new base backup streamer that performs decompression of lz4
273  * compressed blocks.
274  */
275 bbstreamer *
277 {
278 #ifdef USE_LZ4
279  bbstreamer_lz4_frame *streamer;
280  LZ4F_errorCode_t ctxError;
281 
282  Assert(next != NULL);
283 
284  streamer = palloc0(sizeof(bbstreamer_lz4_frame));
285  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
286  &bbstreamer_lz4_decompressor_ops;
287 
288  streamer->base.bbs_next = next;
289  initStringInfo(&streamer->base.bbs_buffer);
290 
291  /* Initialize internal stream state for decompression */
292  ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
293  if (LZ4F_isError(ctxError))
294  pg_fatal("could not initialize compression library: %s",
295  LZ4F_getErrorName(ctxError));
296 
297  return &streamer->base;
298 #else
299  pg_fatal("this build does not support lz4 compression");
300  return NULL; /* keep compiler quiet */
301 #endif
302 }
303 
304 #ifdef USE_LZ4
305 /*
306  * Decompress the input data to output buffer until we run out of input
307  * data. Each time the output buffer is full, pass on the decompressed data
308  * to the next streamer.
309  */
310 static void
311 bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
312  bbstreamer_member *member,
313  const char *data, int len,
315 {
316  bbstreamer_lz4_frame *mystreamer;
317  uint8 *next_in,
318  *next_out;
319  size_t avail_in,
320  avail_out;
321 
322  mystreamer = (bbstreamer_lz4_frame *) streamer;
323  next_in = (uint8 *) data;
324  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
325  avail_in = len;
326  avail_out = mystreamer->base.bbs_buffer.maxlen;
327 
328  while (avail_in > 0)
329  {
330  size_t ret,
331  read_size,
332  out_size;
333 
334  read_size = avail_in;
335  out_size = avail_out;
336 
337  /*
338  * This call decompresses the data starting at next_in and generates
339  * the output data starting at next_out. It expects the caller to
340  * provide size of the input buffer and total capacity of the output
341  * buffer by providing the read_size and out_size parameters
342  * respectively.
343  *
344  * Per the documentation of LZ4, parameters read_size and out_size
345  * behaves as dual parameters. On return, the number of bytes consumed
346  * from the input buffer will be written back to read_size and the
347  * number of bytes decompressed to output buffer will be written back
348  * to out_size respectively.
349  */
350  ret = LZ4F_decompress(mystreamer->dctx,
351  next_out, &out_size,
352  next_in, &read_size, NULL);
353 
354  if (LZ4F_isError(ret))
355  pg_log_error("could not decompress data: %s",
356  LZ4F_getErrorName(ret));
357 
358  /* Update input buffer based on number of bytes consumed */
359  avail_in -= read_size;
360  next_in += read_size;
361 
362  mystreamer->bytes_written += out_size;
363 
364  /*
365  * If output buffer is full then forward the content to next streamer
366  * and update the output buffer.
367  */
368  if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
369  {
370  bbstreamer_content(mystreamer->base.bbs_next, member,
371  mystreamer->base.bbs_buffer.data,
372  mystreamer->base.bbs_buffer.maxlen,
373  context);
374 
375  avail_out = mystreamer->base.bbs_buffer.maxlen;
376  mystreamer->bytes_written = 0;
377  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
378  }
379  else
380  {
381  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
382  next_out += mystreamer->bytes_written;
383  }
384  }
385 }
386 
387 /*
388  * End-of-stream processing.
389  */
390 static void
391 bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
392 {
393  bbstreamer_lz4_frame *mystreamer;
394 
395  mystreamer = (bbstreamer_lz4_frame *) streamer;
396 
397  /*
398  * End of the stream, if there is some pending data in output buffers then
399  * we must forward it to next streamer.
400  */
401  bbstreamer_content(mystreamer->base.bbs_next, NULL,
402  mystreamer->base.bbs_buffer.data,
403  mystreamer->base.bbs_buffer.maxlen,
405 
406  bbstreamer_finalize(mystreamer->base.bbs_next);
407 }
408 
409 /*
410  * Free memory.
411  */
412 static void
413 bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
414 {
415  bbstreamer_lz4_frame *mystreamer;
416 
417  mystreamer = (bbstreamer_lz4_frame *) streamer;
418  bbstreamer_free(streamer->bbs_next);
419  LZ4F_freeDecompressionContext(mystreamer->dctx);
420  pfree(streamer->bbs_buffer.data);
421  pfree(streamer);
422 }
423 #endif
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:126
static void bbstreamer_finalize(bbstreamer *streamer)
Definition: bbstreamer.h:136
bbstreamer_archive_context
Definition: bbstreamer.h:54
@ BBSTREAMER_UNKNOWN
Definition: bbstreamer.h:55
static void bbstreamer_free(bbstreamer *streamer)
Definition: bbstreamer.h:144
bbstreamer * bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_lz4_decompressor_new(bbstreamer *next)
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:439
#define PG_COMPRESSION_OPTION_LEVEL
Definition: compression.h:25
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
#define pg_fatal(...)
const void size_t len
const void * data
while(p+4<=pend)
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:117
StringInfoData bbs_buffer
Definition: bbstreamer.h:102
bbstreamer * bbs_next
Definition: bbstreamer.h:101