PostgreSQL Source Code  git master
basebackup_lz4.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_lz4.c
4  * Basebackup sink implementing lz4 compression.
5  *
6  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/basebackup_lz4.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #ifdef USE_LZ4
16 #include <lz4frame.h>
17 #endif
18 
20 
21 #ifdef USE_LZ4
22 
23 typedef struct bbsink_lz4
24 {
25  /* Common information for all types of sink. */
26  bbsink base;
27 
28  /* Compression level. */
29  int compresslevel;
30 
31  LZ4F_compressionContext_t ctx;
32  LZ4F_preferences_t prefs;
33 
34  /* Number of bytes staged in output buffer. */
35  size_t bytes_written;
36 } bbsink_lz4;
37 
38 static void bbsink_lz4_begin_backup(bbsink *sink);
39 static void bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name);
40 static void bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in);
41 static void bbsink_lz4_manifest_contents(bbsink *sink, size_t len);
42 static void bbsink_lz4_end_archive(bbsink *sink);
43 static void bbsink_lz4_cleanup(bbsink *sink);
44 
45 static const bbsink_ops bbsink_lz4_ops = {
46  .begin_backup = bbsink_lz4_begin_backup,
47  .begin_archive = bbsink_lz4_begin_archive,
48  .archive_contents = bbsink_lz4_archive_contents,
49  .end_archive = bbsink_lz4_end_archive,
50  .begin_manifest = bbsink_forward_begin_manifest,
51  .manifest_contents = bbsink_lz4_manifest_contents,
52  .end_manifest = bbsink_forward_end_manifest,
53  .end_backup = bbsink_forward_end_backup,
54  .cleanup = bbsink_lz4_cleanup
55 };
56 #endif
57 
58 /*
59  * Create a new basebackup sink that performs lz4 compression.
60  */
61 bbsink *
63 {
64 #ifndef USE_LZ4
65  ereport(ERROR,
66  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67  errmsg("lz4 compression is not supported by this build")));
68  return NULL; /* keep compiler quiet */
69 #else
70  bbsink_lz4 *sink;
71  int compresslevel;
72 
73  Assert(next != NULL);
74 
75  if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) == 0)
76  compresslevel = 0;
77  else
78  {
79  compresslevel = compress->level;
80  Assert(compresslevel >= 1 && compresslevel <= 12);
81  }
82 
83  sink = palloc0(sizeof(bbsink_lz4));
84  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
85  sink->base.bbs_next = next;
86  sink->compresslevel = compresslevel;
87 
88  return &sink->base;
89 #endif
90 }
91 
92 #ifdef USE_LZ4
93 
94 /*
95  * Begin backup.
96  */
97 static void
98 bbsink_lz4_begin_backup(bbsink *sink)
99 {
100  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
101  size_t output_buffer_bound;
102  LZ4F_preferences_t *prefs = &mysink->prefs;
103 
104  /* Initialize compressor object. */
105  memset(prefs, 0, sizeof(LZ4F_preferences_t));
106  prefs->frameInfo.blockSizeID = LZ4F_max256KB;
107  prefs->compressionLevel = mysink->compresslevel;
108 
109  /*
110  * We need our own buffer, because we're going to pass different data to
111  * the next sink than what gets passed to us.
112  */
113  mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
114 
115  /*
116  * Since LZ4F_compressUpdate() requires the output buffer of size equal or
117  * greater than that of LZ4F_compressBound(), make sure we have the next
118  * sink's bbs_buffer of length that can accommodate the compressed input
119  * buffer.
120  */
121  output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
122  &mysink->prefs);
123 
124  /*
125  * The buffer length is expected to be a multiple of BLCKSZ, so round up.
126  */
127  output_buffer_bound = output_buffer_bound + BLCKSZ -
128  (output_buffer_bound % BLCKSZ);
129 
130  bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
131 }
132 
133 /*
134  * Prepare to compress the next archive.
135  */
136 static void
137 bbsink_lz4_begin_archive(bbsink *sink, const char *archive_name)
138 {
139  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
140  char *lz4_archive_name;
141  LZ4F_errorCode_t ctxError;
142  size_t headerSize;
143 
144  ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
145  if (LZ4F_isError(ctxError))
146  elog(ERROR, "could not create lz4 compression context: %s",
147  LZ4F_getErrorName(ctxError));
148 
149  /* First of all write the frame header to destination buffer. */
150  headerSize = LZ4F_compressBegin(mysink->ctx,
151  mysink->base.bbs_next->bbs_buffer,
152  mysink->base.bbs_next->bbs_buffer_length,
153  &mysink->prefs);
154 
155  if (LZ4F_isError(headerSize))
156  elog(ERROR, "could not write lz4 header: %s",
157  LZ4F_getErrorName(headerSize));
158 
159  /*
160  * We need to write the compressed data after the header in the output
161  * buffer. So, make sure to update the notion of bytes written to output
162  * buffer.
163  */
164  mysink->bytes_written += headerSize;
165 
166  /* Add ".lz4" to the archive name. */
167  lz4_archive_name = psprintf("%s.lz4", archive_name);
168  Assert(sink->bbs_next != NULL);
169  bbsink_begin_archive(sink->bbs_next, lz4_archive_name);
170  pfree(lz4_archive_name);
171 }
172 
173 /*
174  * Compress the input data to the output buffer until we run out of input
175  * data. Each time the output buffer falls below the compression bound for
176  * the input buffer, invoke the archive_contents() method for then next sink.
177  *
178  * Note that since we're compressing the input, it may very commonly happen
179  * that we consume all the input data without filling the output buffer. In
180  * that case, the compressed representation of the current input data won't
181  * actually be sent to the next bbsink until a later call to this function,
182  * or perhaps even not until bbsink_lz4_end_archive() is invoked.
183  */
184 static void
185 bbsink_lz4_archive_contents(bbsink *sink, size_t avail_in)
186 {
187  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
188  size_t compressedSize;
189  size_t avail_in_bound;
190 
191  avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
192 
193  /*
194  * If the number of available bytes has fallen below the value computed by
195  * LZ4F_compressBound(), ask the next sink to process the data so that we
196  * can empty the buffer.
197  */
198  if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
199  avail_in_bound)
200  {
201  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
202  mysink->bytes_written = 0;
203  }
204 
205  /*
206  * Compress the input buffer and write it into the output buffer.
207  */
208  compressedSize = LZ4F_compressUpdate(mysink->ctx,
209  mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
210  mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
211  (uint8 *) mysink->base.bbs_buffer,
212  avail_in,
213  NULL);
214 
215  if (LZ4F_isError(compressedSize))
216  elog(ERROR, "could not compress data: %s",
217  LZ4F_getErrorName(compressedSize));
218 
219  /*
220  * Update our notion of how many bytes we've written into output buffer.
221  */
222  mysink->bytes_written += compressedSize;
223 }
224 
225 /*
226  * There might be some data inside lz4's internal buffers; we need to get
227  * that flushed out and also finalize the lz4 frame and then get that forwarded
228  * to the successor sink as archive content.
229  *
230  * Then we can end processing for this archive.
231  */
232 static void
233 bbsink_lz4_end_archive(bbsink *sink)
234 {
235  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
236  size_t compressedSize;
237  size_t lz4_footer_bound;
238 
239  lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
240 
241  Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
242 
243  if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
244  lz4_footer_bound)
245  {
246  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
247  mysink->bytes_written = 0;
248  }
249 
250  compressedSize = LZ4F_compressEnd(mysink->ctx,
251  mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
252  mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
253  NULL);
254 
255  if (LZ4F_isError(compressedSize))
256  elog(ERROR, "could not end lz4 compression: %s",
257  LZ4F_getErrorName(compressedSize));
258 
259  /* Update our notion of how many bytes we've written. */
260  mysink->bytes_written += compressedSize;
261 
262  /* Send whatever accumulated output bytes we have. */
263  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
264  mysink->bytes_written = 0;
265 
266  /* Release the resources. */
267  LZ4F_freeCompressionContext(mysink->ctx);
268  mysink->ctx = NULL;
269 
270  /* Pass on the information that this archive has ended. */
272 }
273 
274 /*
275  * Manifest contents are not compressed, but we do need to copy them into
276  * the successor sink's buffer, because we have our own.
277  */
278 static void
279 bbsink_lz4_manifest_contents(bbsink *sink, size_t len)
280 {
281  memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
283 }
284 
285 /*
286  * In case the backup fails, make sure we free the compression context by
287  * calling LZ4F_freeCompressionContext() if needed to avoid memory leak.
288  */
289 static void
290 bbsink_lz4_cleanup(bbsink *sink)
291 {
292  bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
293 
294  if (mysink->ctx)
295  {
296  LZ4F_freeCompressionContext(mysink->ctx);
297  mysink->ctx = NULL;
298  }
299 }
300 
301 #endif
bbsink * bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:439
#define PG_COMPRESSION_OPTION_LEVEL
Definition: compression.h:25
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
void * palloc(Size size)
Definition: mcxt.c:1068
const void size_t len
static int compresslevel
Definition: pg_receivewal.c:45
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer