PostgreSQL Source Code  git master
bbstreamer_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bbstreamer_zstd.c
4  *
5  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/bbstreamer_zstd.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 
16 #ifdef USE_ZSTD
17 #include <zstd.h>
18 #endif
19 
20 #include "bbstreamer.h"
21 #include "common/logging.h"
22 
23 #ifdef USE_ZSTD
24 
25 typedef struct bbstreamer_zstd_frame
26 {
27  bbstreamer base;
28 
29  ZSTD_CCtx *cctx;
30  ZSTD_DCtx *dctx;
31  ZSTD_outBuffer zstd_outBuf;
32 } bbstreamer_zstd_frame;
33 
34 static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
35  bbstreamer_member *member,
36  const char *data, int len,
38 static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
39 static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
40 
41 const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
42  .content = bbstreamer_zstd_compressor_content,
43  .finalize = bbstreamer_zstd_compressor_finalize,
44  .free = bbstreamer_zstd_compressor_free
45 };
46 
47 static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
48  bbstreamer_member *member,
49  const char *data, int len,
51 static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
52 static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
53 
54 const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
55  .content = bbstreamer_zstd_decompressor_content,
56  .finalize = bbstreamer_zstd_decompressor_finalize,
57  .free = bbstreamer_zstd_decompressor_free
58 };
59 #endif
60 
61 /*
62  * Create a new base backup streamer that performs zstd compression of tar
63  * blocks.
64  */
65 bbstreamer *
67 {
68 #ifdef USE_ZSTD
69  bbstreamer_zstd_frame *streamer;
70  size_t ret;
71 
72  Assert(next != NULL);
73 
74  streamer = palloc0(sizeof(bbstreamer_zstd_frame));
75 
76  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
77  &bbstreamer_zstd_compressor_ops;
78 
79  streamer->base.bbs_next = next;
80  initStringInfo(&streamer->base.bbs_buffer);
81  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
82 
83  streamer->cctx = ZSTD_createCCtx();
84  if (!streamer->cctx)
85  pg_fatal("could not create zstd compression context");
86 
87  /* Set compression level, if specified */
88  if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) != 0)
89  {
90  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
91  compress->level);
92  if (ZSTD_isError(ret))
93  pg_fatal("could not set zstd compression level to %d: %s",
94  compress->level, ZSTD_getErrorName(ret));
95  }
96 
97  /* Set # of workers, if specified */
98  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
99  {
100  /*
101  * On older versions of libzstd, this option does not exist, and
102  * trying to set it will fail. Similarly for newer versions if they
103  * are compiled without threading support.
104  */
105  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
106  compress->workers);
107  if (ZSTD_isError(ret))
108  pg_fatal("could not set compression worker count to %d: %s",
109  compress->workers, ZSTD_getErrorName(ret));
110  }
111 
112  /* Initialize the ZSTD output buffer. */
113  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
114  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
115  streamer->zstd_outBuf.pos = 0;
116 
117  return &streamer->base;
118 #else
119  pg_fatal("this build does not support zstd compression");
120  return NULL; /* keep compiler quiet */
121 #endif
122 }
123 
124 #ifdef USE_ZSTD
125 /*
126  * Compress the input data to output buffer.
127  *
128  * Find out the compression bound based on input data length for each
129  * invocation to make sure that output buffer has enough capacity to
130  * accommodate the compressed data. In case if the output buffer
131  * capacity falls short of compression bound then forward the content
132  * of output buffer to next streamer and empty the buffer.
133  */
134 static void
135 bbstreamer_zstd_compressor_content(bbstreamer *streamer,
136  bbstreamer_member *member,
137  const char *data, int len,
139 {
140  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
141  ZSTD_inBuffer inBuf = {data, len, 0};
142 
143  while (inBuf.pos < inBuf.size)
144  {
145  size_t yet_to_flush;
146  size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
147 
148  /*
149  * If the output buffer is not left with enough space, send the
150  * compressed bytes to the next streamer, and empty the buffer.
151  */
152  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
153  max_needed)
154  {
155  bbstreamer_content(mystreamer->base.bbs_next, member,
156  mystreamer->zstd_outBuf.dst,
157  mystreamer->zstd_outBuf.pos,
158  context);
159 
160  /* Reset the ZSTD output buffer. */
161  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
162  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
163  mystreamer->zstd_outBuf.pos = 0;
164  }
165 
166  yet_to_flush =
167  ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
168  &inBuf, ZSTD_e_continue);
169 
170  if (ZSTD_isError(yet_to_flush))
171  pg_log_error("could not compress data: %s",
172  ZSTD_getErrorName(yet_to_flush));
173  }
174 }
175 
176 /*
177  * End-of-stream processing.
178  */
179 static void
180 bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
181 {
182  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
183  size_t yet_to_flush;
184 
185  do
186  {
187  ZSTD_inBuffer in = {NULL, 0, 0};
188  size_t max_needed = ZSTD_compressBound(0);
189 
190  /*
191  * If the output buffer is not left with enough space, send the
192  * compressed bytes to the next streamer, and empty the buffer.
193  */
194  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
195  max_needed)
196  {
197  bbstreamer_content(mystreamer->base.bbs_next, NULL,
198  mystreamer->zstd_outBuf.dst,
199  mystreamer->zstd_outBuf.pos,
201 
202  /* Reset the ZSTD output buffer. */
203  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
204  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
205  mystreamer->zstd_outBuf.pos = 0;
206  }
207 
208  yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
209  &mystreamer->zstd_outBuf,
210  &in, ZSTD_e_end);
211 
212  if (ZSTD_isError(yet_to_flush))
213  pg_log_error("could not compress data: %s",
214  ZSTD_getErrorName(yet_to_flush));
215 
216  } while (yet_to_flush > 0);
217 
218  /* Make sure to pass any remaining bytes to the next streamer. */
219  if (mystreamer->zstd_outBuf.pos > 0)
220  bbstreamer_content(mystreamer->base.bbs_next, NULL,
221  mystreamer->zstd_outBuf.dst,
222  mystreamer->zstd_outBuf.pos,
224 
225  bbstreamer_finalize(mystreamer->base.bbs_next);
226 }
227 
228 /*
229  * Free memory.
230  */
231 static void
232 bbstreamer_zstd_compressor_free(bbstreamer *streamer)
233 {
234  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
235 
236  bbstreamer_free(streamer->bbs_next);
237  ZSTD_freeCCtx(mystreamer->cctx);
238  pfree(streamer->bbs_buffer.data);
239  pfree(streamer);
240 }
241 #endif
242 
243 /*
244  * Create a new base backup streamer that performs decompression of zstd
245  * compressed blocks.
246  */
247 bbstreamer *
249 {
250 #ifdef USE_ZSTD
251  bbstreamer_zstd_frame *streamer;
252 
253  Assert(next != NULL);
254 
255  streamer = palloc0(sizeof(bbstreamer_zstd_frame));
256  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
257  &bbstreamer_zstd_decompressor_ops;
258 
259  streamer->base.bbs_next = next;
260  initStringInfo(&streamer->base.bbs_buffer);
261  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
262 
263  streamer->dctx = ZSTD_createDCtx();
264  if (!streamer->dctx)
265  pg_fatal("could not create zstd decompression context");
266 
267  /* Initialize the ZSTD output buffer. */
268  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
269  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
270  streamer->zstd_outBuf.pos = 0;
271 
272  return &streamer->base;
273 #else
274  pg_fatal("this build does not support zstd compression");
275  return NULL; /* keep compiler quiet */
276 #endif
277 }
278 
279 #ifdef USE_ZSTD
280 /*
281  * Decompress the input data to output buffer until we run out of input
282  * data. Each time the output buffer is full, pass on the decompressed data
283  * to the next streamer.
284  */
285 static void
286 bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
287  bbstreamer_member *member,
288  const char *data, int len,
290 {
291  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
292  ZSTD_inBuffer inBuf = {data, len, 0};
293 
294  while (inBuf.pos < inBuf.size)
295  {
296  size_t ret;
297 
298  /*
299  * If output buffer is full then forward the content to next streamer
300  * and update the output buffer.
301  */
302  if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
303  {
304  bbstreamer_content(mystreamer->base.bbs_next, member,
305  mystreamer->zstd_outBuf.dst,
306  mystreamer->zstd_outBuf.pos,
307  context);
308 
309  /* Reset the ZSTD output buffer. */
310  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
311  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
312  mystreamer->zstd_outBuf.pos = 0;
313  }
314 
315  ret = ZSTD_decompressStream(mystreamer->dctx,
316  &mystreamer->zstd_outBuf, &inBuf);
317 
318  if (ZSTD_isError(ret))
319  pg_log_error("could not decompress data: %s",
320  ZSTD_getErrorName(ret));
321  }
322 }
323 
324 /*
325  * End-of-stream processing.
326  */
327 static void
328 bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
329 {
330  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
331 
332  /*
333  * End of the stream, if there is some pending data in output buffers then
334  * we must forward it to next streamer.
335  */
336  if (mystreamer->zstd_outBuf.pos > 0)
337  bbstreamer_content(mystreamer->base.bbs_next, NULL,
338  mystreamer->base.bbs_buffer.data,
339  mystreamer->base.bbs_buffer.maxlen,
341 
342  bbstreamer_finalize(mystreamer->base.bbs_next);
343 }
344 
345 /*
346  * Free memory.
347  */
348 static void
349 bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
350 {
351  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
352 
353  bbstreamer_free(streamer->bbs_next);
354  ZSTD_freeDCtx(mystreamer->dctx);
355  pfree(streamer->bbs_buffer.data);
356  pfree(streamer);
357 }
358 #endif
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:126
static void bbstreamer_finalize(bbstreamer *streamer)
Definition: bbstreamer.h:136
bbstreamer_archive_context
Definition: bbstreamer.h:54
@ BBSTREAMER_UNKNOWN
Definition: bbstreamer.h:55
static void bbstreamer_free(bbstreamer *streamer)
Definition: bbstreamer.h:144
bbstreamer * bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_zstd_decompressor_new(bbstreamer *next)
static int32 next
Definition: blutils.c:219
#define PG_COMPRESSION_OPTION_LEVEL
Definition: compression.h:25
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:26
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
#define pg_fatal(...)
const void size_t len
const void * data
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:117
StringInfoData bbs_buffer
Definition: bbstreamer.h:102
bbstreamer * bbs_next
Definition: bbstreamer.h:101