PostgreSQL Source Code  git master
bbstreamer_lz4.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bbstreamer_lz4.c
4  *
5  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/bbstreamer_lz4.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 
16 #ifdef USE_LZ4
17 #include <lz4frame.h>
18 #endif
19 
20 #include "bbstreamer.h"
21 #include "common/logging.h"
22 #include "common/file_perm.h"
23 #include "common/string.h"
24 
25 #ifdef USE_LZ4
26 typedef struct bbstreamer_lz4_frame
27 {
28  bbstreamer base;
29 
30  LZ4F_compressionContext_t cctx;
31  LZ4F_decompressionContext_t dctx;
32  LZ4F_preferences_t prefs;
33 
34  size_t bytes_written;
35  bool header_written;
36 } bbstreamer_lz4_frame;
37 
38 static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
39  bbstreamer_member *member,
40  const char *data, int len,
42 static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
43 static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
44 
45 const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
46  .content = bbstreamer_lz4_compressor_content,
47  .finalize = bbstreamer_lz4_compressor_finalize,
48  .free = bbstreamer_lz4_compressor_free
49 };
50 
51 static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
52  bbstreamer_member *member,
53  const char *data, int len,
55 static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
56 static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
57 
58 const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
59  .content = bbstreamer_lz4_decompressor_content,
60  .finalize = bbstreamer_lz4_decompressor_finalize,
61  .free = bbstreamer_lz4_decompressor_free
62 };
63 #endif
64 
65 /*
66  * Create a new base backup streamer that performs lz4 compression of tar
67  * blocks.
68  */
69 bbstreamer *
71 {
72 #ifdef USE_LZ4
73  bbstreamer_lz4_frame *streamer;
74  LZ4F_errorCode_t ctxError;
75  LZ4F_preferences_t *prefs;
76 
77  Assert(next != NULL);
78 
79  streamer = palloc0(sizeof(bbstreamer_lz4_frame));
80  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
81  &bbstreamer_lz4_compressor_ops;
82 
83  streamer->base.bbs_next = next;
84  initStringInfo(&streamer->base.bbs_buffer);
85  streamer->header_written = false;
86 
87  /* Initialize stream compression preferences */
88  prefs = &streamer->prefs;
89  memset(prefs, 0, sizeof(LZ4F_preferences_t));
90  prefs->frameInfo.blockSizeID = LZ4F_max256KB;
91  prefs->compressionLevel = compress->level;
92 
93  ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
94  if (LZ4F_isError(ctxError))
95  pg_log_error("could not create lz4 compression context: %s",
96  LZ4F_getErrorName(ctxError));
97 
98  return &streamer->base;
99 #else
100  pg_fatal("this build does not support compression with %s", "LZ4");
101  return NULL; /* keep compiler quiet */
102 #endif
103 }
104 
105 #ifdef USE_LZ4
106 /*
107  * Compress the input data to output buffer.
108  *
109  * Find out the compression bound based on input data length for each
110  * invocation to make sure that output buffer has enough capacity to
111  * accommodate the compressed data. In case if the output buffer
112  * capacity falls short of compression bound then forward the content
113  * of output buffer to next streamer and empty the buffer.
114  */
115 static void
116 bbstreamer_lz4_compressor_content(bbstreamer *streamer,
117  bbstreamer_member *member,
118  const char *data, int len,
120 {
121  bbstreamer_lz4_frame *mystreamer;
122  uint8 *next_in,
123  *next_out;
124  size_t out_bound,
125  compressed_size,
126  avail_out;
127 
128  mystreamer = (bbstreamer_lz4_frame *) streamer;
129  next_in = (uint8 *) data;
130 
131  /* Write header before processing the first input chunk. */
132  if (!mystreamer->header_written)
133  {
134  compressed_size = LZ4F_compressBegin(mystreamer->cctx,
135  (uint8 *) mystreamer->base.bbs_buffer.data,
136  mystreamer->base.bbs_buffer.maxlen,
137  &mystreamer->prefs);
138 
139  if (LZ4F_isError(compressed_size))
140  pg_log_error("could not write lz4 header: %s",
141  LZ4F_getErrorName(compressed_size));
142 
143  mystreamer->bytes_written += compressed_size;
144  mystreamer->header_written = true;
145  }
146 
147  /*
148  * Update the offset and capacity of output buffer based on number of
149  * bytes written to output buffer.
150  */
151  next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
152  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
153 
154  /*
155  * Find out the compression bound and make sure that output buffer has the
156  * required capacity for the success of LZ4F_compressUpdate. If needed
157  * forward the content to next streamer and empty the buffer.
158  */
159  out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
160  if (avail_out < out_bound)
161  {
162  bbstreamer_content(mystreamer->base.bbs_next, member,
163  mystreamer->base.bbs_buffer.data,
164  mystreamer->bytes_written,
165  context);
166 
167  /* Enlarge buffer if it falls short of out bound. */
168  if (mystreamer->base.bbs_buffer.maxlen < out_bound)
169  enlargeStringInfo(&mystreamer->base.bbs_buffer, out_bound);
170 
171  avail_out = mystreamer->base.bbs_buffer.maxlen;
172  mystreamer->bytes_written = 0;
173  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
174  }
175 
176  /*
177  * This call compresses the data starting at next_in and generates the
178  * output starting at next_out. It expects the caller to provide the size
179  * of input buffer and capacity of output buffer by providing parameters
180  * len and avail_out.
181  *
182  * It returns the number of bytes compressed to output buffer.
183  */
184  compressed_size = LZ4F_compressUpdate(mystreamer->cctx,
185  next_out, avail_out,
186  next_in, len, NULL);
187 
188  if (LZ4F_isError(compressed_size))
189  pg_log_error("could not compress data: %s",
190  LZ4F_getErrorName(compressed_size));
191 
192  mystreamer->bytes_written += compressed_size;
193 }
194 
195 /*
196  * End-of-stream processing.
197  */
198 static void
199 bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
200 {
201  bbstreamer_lz4_frame *mystreamer;
202  uint8 *next_out;
203  size_t footer_bound,
204  compressed_size,
205  avail_out;
206 
207  mystreamer = (bbstreamer_lz4_frame *) streamer;
208 
209  /* Find out the footer bound and update the output buffer. */
210  footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
211  if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
212  footer_bound)
213  {
214  bbstreamer_content(mystreamer->base.bbs_next, NULL,
215  mystreamer->base.bbs_buffer.data,
216  mystreamer->bytes_written,
218 
219  /* Enlarge buffer if it falls short of footer bound. */
220  if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
221  enlargeStringInfo(&mystreamer->base.bbs_buffer, footer_bound);
222 
223  avail_out = mystreamer->base.bbs_buffer.maxlen;
224  mystreamer->bytes_written = 0;
225  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
226  }
227  else
228  {
229  next_out = (uint8 *) mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
230  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
231  }
232 
233  /*
234  * Finalize the frame and flush whatever data remaining in compression
235  * context.
236  */
237  compressed_size = LZ4F_compressEnd(mystreamer->cctx,
238  next_out, avail_out, NULL);
239 
240  if (LZ4F_isError(compressed_size))
241  pg_log_error("could not end lz4 compression: %s",
242  LZ4F_getErrorName(compressed_size));
243 
244  mystreamer->bytes_written += compressed_size;
245 
246  bbstreamer_content(mystreamer->base.bbs_next, NULL,
247  mystreamer->base.bbs_buffer.data,
248  mystreamer->bytes_written,
250 
251  bbstreamer_finalize(mystreamer->base.bbs_next);
252 }
253 
254 /*
255  * Free memory.
256  */
257 static void
258 bbstreamer_lz4_compressor_free(bbstreamer *streamer)
259 {
260  bbstreamer_lz4_frame *mystreamer;
261 
262  mystreamer = (bbstreamer_lz4_frame *) streamer;
263  bbstreamer_free(streamer->bbs_next);
264  LZ4F_freeCompressionContext(mystreamer->cctx);
265  pfree(streamer->bbs_buffer.data);
266  pfree(streamer);
267 }
268 #endif
269 
270 /*
271  * Create a new base backup streamer that performs decompression of lz4
272  * compressed blocks.
273  */
274 bbstreamer *
276 {
277 #ifdef USE_LZ4
278  bbstreamer_lz4_frame *streamer;
279  LZ4F_errorCode_t ctxError;
280 
281  Assert(next != NULL);
282 
283  streamer = palloc0(sizeof(bbstreamer_lz4_frame));
284  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
285  &bbstreamer_lz4_decompressor_ops;
286 
287  streamer->base.bbs_next = next;
288  initStringInfo(&streamer->base.bbs_buffer);
289 
290  /* Initialize internal stream state for decompression */
291  ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
292  if (LZ4F_isError(ctxError))
293  pg_fatal("could not initialize compression library: %s",
294  LZ4F_getErrorName(ctxError));
295 
296  return &streamer->base;
297 #else
298  pg_fatal("this build does not support compression with %s", "LZ4");
299  return NULL; /* keep compiler quiet */
300 #endif
301 }
302 
303 #ifdef USE_LZ4
304 /*
305  * Decompress the input data to output buffer until we run out of input
306  * data. Each time the output buffer is full, pass on the decompressed data
307  * to the next streamer.
308  */
309 static void
310 bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
311  bbstreamer_member *member,
312  const char *data, int len,
314 {
315  bbstreamer_lz4_frame *mystreamer;
316  uint8 *next_in,
317  *next_out;
318  size_t avail_in,
319  avail_out;
320 
321  mystreamer = (bbstreamer_lz4_frame *) streamer;
322  next_in = (uint8 *) data;
323  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
324  avail_in = len;
325  avail_out = mystreamer->base.bbs_buffer.maxlen;
326 
327  while (avail_in > 0)
328  {
329  size_t ret,
330  read_size,
331  out_size;
332 
333  read_size = avail_in;
334  out_size = avail_out;
335 
336  /*
337  * This call decompresses the data starting at next_in and generates
338  * the output data starting at next_out. It expects the caller to
339  * provide size of the input buffer and total capacity of the output
340  * buffer by providing the read_size and out_size parameters
341  * respectively.
342  *
343  * Per the documentation of LZ4, parameters read_size and out_size
344  * behaves as dual parameters. On return, the number of bytes consumed
345  * from the input buffer will be written back to read_size and the
346  * number of bytes decompressed to output buffer will be written back
347  * to out_size respectively.
348  */
349  ret = LZ4F_decompress(mystreamer->dctx,
350  next_out, &out_size,
351  next_in, &read_size, NULL);
352 
353  if (LZ4F_isError(ret))
354  pg_log_error("could not decompress data: %s",
355  LZ4F_getErrorName(ret));
356 
357  /* Update input buffer based on number of bytes consumed */
358  avail_in -= read_size;
359  next_in += read_size;
360 
361  mystreamer->bytes_written += out_size;
362 
363  /*
364  * If output buffer is full then forward the content to next streamer
365  * and update the output buffer.
366  */
367  if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
368  {
369  bbstreamer_content(mystreamer->base.bbs_next, member,
370  mystreamer->base.bbs_buffer.data,
371  mystreamer->base.bbs_buffer.maxlen,
372  context);
373 
374  avail_out = mystreamer->base.bbs_buffer.maxlen;
375  mystreamer->bytes_written = 0;
376  next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
377  }
378  else
379  {
380  avail_out = mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
381  next_out += mystreamer->bytes_written;
382  }
383  }
384 }
385 
386 /*
387  * End-of-stream processing.
388  */
389 static void
390 bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
391 {
392  bbstreamer_lz4_frame *mystreamer;
393 
394  mystreamer = (bbstreamer_lz4_frame *) streamer;
395 
396  /*
397  * End of the stream, if there is some pending data in output buffers then
398  * we must forward it to next streamer.
399  */
400  bbstreamer_content(mystreamer->base.bbs_next, NULL,
401  mystreamer->base.bbs_buffer.data,
402  mystreamer->base.bbs_buffer.maxlen,
404 
405  bbstreamer_finalize(mystreamer->base.bbs_next);
406 }
407 
408 /*
409  * Free memory.
410  */
411 static void
412 bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
413 {
414  bbstreamer_lz4_frame *mystreamer;
415 
416  mystreamer = (bbstreamer_lz4_frame *) streamer;
417  bbstreamer_free(streamer->bbs_next);
418  LZ4F_freeDecompressionContext(mystreamer->dctx);
419  pfree(streamer->bbs_buffer.data);
420  pfree(streamer);
421 }
422 #endif
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:126
static void bbstreamer_finalize(bbstreamer *streamer)
Definition: bbstreamer.h:136
bbstreamer_archive_context
Definition: bbstreamer.h:54
@ BBSTREAMER_UNKNOWN
Definition: bbstreamer.h:55
static void bbstreamer_free(bbstreamer *streamer)
Definition: bbstreamer.h:144
bbstreamer * bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_lz4_decompressor_new(bbstreamer *next)
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:488
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1436
void * palloc0(Size size)
Definition: mcxt.c:1241
#define pg_fatal(...)
const void size_t len
const void * data
while(p+4<=pend)
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:117
StringInfoData bbs_buffer
Definition: bbstreamer.h:102
bbstreamer * bbs_next
Definition: bbstreamer.h:101