PostgreSQL Source Code  git master
bbstreamer_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bbstreamer_zstd.c
4  *
5  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/bbstreamer_zstd.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 
16 #ifdef USE_ZSTD
17 #include <zstd.h>
18 #endif
19 
20 #include "bbstreamer.h"
21 #include "common/logging.h"
22 
23 #ifdef USE_ZSTD
24 
25 typedef struct bbstreamer_zstd_frame
26 {
27  bbstreamer base;
28 
29  ZSTD_CCtx *cctx;
30  ZSTD_DCtx *dctx;
31  ZSTD_outBuffer zstd_outBuf;
32 } bbstreamer_zstd_frame;
33 
34 static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
35  bbstreamer_member *member,
36  const char *data, int len,
38 static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
39 static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
40 
41 const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
42  .content = bbstreamer_zstd_compressor_content,
43  .finalize = bbstreamer_zstd_compressor_finalize,
44  .free = bbstreamer_zstd_compressor_free
45 };
46 
47 static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
48  bbstreamer_member *member,
49  const char *data, int len,
51 static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
52 static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
53 
54 const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
55  .content = bbstreamer_zstd_decompressor_content,
56  .finalize = bbstreamer_zstd_decompressor_finalize,
57  .free = bbstreamer_zstd_decompressor_free
58 };
59 #endif
60 
61 /*
62  * Create a new base backup streamer that performs zstd compression of tar
63  * blocks.
64  */
65 bbstreamer *
67 {
68 #ifdef USE_ZSTD
69  bbstreamer_zstd_frame *streamer;
70  size_t ret;
71 
72  Assert(next != NULL);
73 
74  streamer = palloc0(sizeof(bbstreamer_zstd_frame));
75 
76  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
77  &bbstreamer_zstd_compressor_ops;
78 
79  streamer->base.bbs_next = next;
80  initStringInfo(&streamer->base.bbs_buffer);
81  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
82 
83  streamer->cctx = ZSTD_createCCtx();
84  if (!streamer->cctx)
85  pg_fatal("could not create zstd compression context");
86 
87  /* Set compression level */
88  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
89  compress->level);
90  if (ZSTD_isError(ret))
91  pg_fatal("could not set zstd compression level to %d: %s",
92  compress->level, ZSTD_getErrorName(ret));
93 
94  /* Set # of workers, if specified */
95  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
96  {
97  /*
98  * On older versions of libzstd, this option does not exist, and
99  * trying to set it will fail. Similarly for newer versions if they
100  * are compiled without threading support.
101  */
102  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
103  compress->workers);
104  if (ZSTD_isError(ret))
105  pg_fatal("could not set compression worker count to %d: %s",
106  compress->workers, ZSTD_getErrorName(ret));
107  }
108 
109  /* Initialize the ZSTD output buffer. */
110  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
111  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
112  streamer->zstd_outBuf.pos = 0;
113 
114  return &streamer->base;
115 #else
116  pg_fatal("this build does not support compression with %s", "ZSTD");
117  return NULL; /* keep compiler quiet */
118 #endif
119 }
120 
121 #ifdef USE_ZSTD
122 /*
123  * Compress the input data to output buffer.
124  *
125  * Find out the compression bound based on input data length for each
126  * invocation to make sure that output buffer has enough capacity to
127  * accommodate the compressed data. In case if the output buffer
128  * capacity falls short of compression bound then forward the content
129  * of output buffer to next streamer and empty the buffer.
130  */
131 static void
132 bbstreamer_zstd_compressor_content(bbstreamer *streamer,
133  bbstreamer_member *member,
134  const char *data, int len,
136 {
137  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
138  ZSTD_inBuffer inBuf = {data, len, 0};
139 
140  while (inBuf.pos < inBuf.size)
141  {
142  size_t yet_to_flush;
143  size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
144 
145  /*
146  * If the output buffer is not left with enough space, send the
147  * compressed bytes to the next streamer, and empty the buffer.
148  */
149  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
150  max_needed)
151  {
152  bbstreamer_content(mystreamer->base.bbs_next, member,
153  mystreamer->zstd_outBuf.dst,
154  mystreamer->zstd_outBuf.pos,
155  context);
156 
157  /* Reset the ZSTD output buffer. */
158  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
159  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
160  mystreamer->zstd_outBuf.pos = 0;
161  }
162 
163  yet_to_flush =
164  ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
165  &inBuf, ZSTD_e_continue);
166 
167  if (ZSTD_isError(yet_to_flush))
168  pg_log_error("could not compress data: %s",
169  ZSTD_getErrorName(yet_to_flush));
170  }
171 }
172 
173 /*
174  * End-of-stream processing.
175  */
176 static void
177 bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
178 {
179  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
180  size_t yet_to_flush;
181 
182  do
183  {
184  ZSTD_inBuffer in = {NULL, 0, 0};
185  size_t max_needed = ZSTD_compressBound(0);
186 
187  /*
188  * If the output buffer is not left with enough space, send the
189  * compressed bytes to the next streamer, and empty the buffer.
190  */
191  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
192  max_needed)
193  {
194  bbstreamer_content(mystreamer->base.bbs_next, NULL,
195  mystreamer->zstd_outBuf.dst,
196  mystreamer->zstd_outBuf.pos,
198 
199  /* Reset the ZSTD output buffer. */
200  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
201  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
202  mystreamer->zstd_outBuf.pos = 0;
203  }
204 
205  yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
206  &mystreamer->zstd_outBuf,
207  &in, ZSTD_e_end);
208 
209  if (ZSTD_isError(yet_to_flush))
210  pg_log_error("could not compress data: %s",
211  ZSTD_getErrorName(yet_to_flush));
212 
213  } while (yet_to_flush > 0);
214 
215  /* Make sure to pass any remaining bytes to the next streamer. */
216  if (mystreamer->zstd_outBuf.pos > 0)
217  bbstreamer_content(mystreamer->base.bbs_next, NULL,
218  mystreamer->zstd_outBuf.dst,
219  mystreamer->zstd_outBuf.pos,
221 
222  bbstreamer_finalize(mystreamer->base.bbs_next);
223 }
224 
225 /*
226  * Free memory.
227  */
228 static void
229 bbstreamer_zstd_compressor_free(bbstreamer *streamer)
230 {
231  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
232 
233  bbstreamer_free(streamer->bbs_next);
234  ZSTD_freeCCtx(mystreamer->cctx);
235  pfree(streamer->bbs_buffer.data);
236  pfree(streamer);
237 }
238 #endif
239 
240 /*
241  * Create a new base backup streamer that performs decompression of zstd
242  * compressed blocks.
243  */
244 bbstreamer *
246 {
247 #ifdef USE_ZSTD
248  bbstreamer_zstd_frame *streamer;
249 
250  Assert(next != NULL);
251 
252  streamer = palloc0(sizeof(bbstreamer_zstd_frame));
253  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
254  &bbstreamer_zstd_decompressor_ops;
255 
256  streamer->base.bbs_next = next;
257  initStringInfo(&streamer->base.bbs_buffer);
258  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
259 
260  streamer->dctx = ZSTD_createDCtx();
261  if (!streamer->dctx)
262  pg_fatal("could not create zstd decompression context");
263 
264  /* Initialize the ZSTD output buffer. */
265  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
266  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
267  streamer->zstd_outBuf.pos = 0;
268 
269  return &streamer->base;
270 #else
271  pg_fatal("this build does not support compression with %s", "ZSTD");
272  return NULL; /* keep compiler quiet */
273 #endif
274 }
275 
276 #ifdef USE_ZSTD
277 /*
278  * Decompress the input data to output buffer until we run out of input
279  * data. Each time the output buffer is full, pass on the decompressed data
280  * to the next streamer.
281  */
282 static void
283 bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
284  bbstreamer_member *member,
285  const char *data, int len,
287 {
288  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
289  ZSTD_inBuffer inBuf = {data, len, 0};
290 
291  while (inBuf.pos < inBuf.size)
292  {
293  size_t ret;
294 
295  /*
296  * If output buffer is full then forward the content to next streamer
297  * and update the output buffer.
298  */
299  if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
300  {
301  bbstreamer_content(mystreamer->base.bbs_next, member,
302  mystreamer->zstd_outBuf.dst,
303  mystreamer->zstd_outBuf.pos,
304  context);
305 
306  /* Reset the ZSTD output buffer. */
307  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
308  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
309  mystreamer->zstd_outBuf.pos = 0;
310  }
311 
312  ret = ZSTD_decompressStream(mystreamer->dctx,
313  &mystreamer->zstd_outBuf, &inBuf);
314 
315  if (ZSTD_isError(ret))
316  pg_log_error("could not decompress data: %s",
317  ZSTD_getErrorName(ret));
318  }
319 }
320 
321 /*
322  * End-of-stream processing.
323  */
324 static void
325 bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
326 {
327  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
328 
329  /*
330  * End of the stream, if there is some pending data in output buffers then
331  * we must forward it to next streamer.
332  */
333  if (mystreamer->zstd_outBuf.pos > 0)
334  bbstreamer_content(mystreamer->base.bbs_next, NULL,
335  mystreamer->base.bbs_buffer.data,
336  mystreamer->base.bbs_buffer.maxlen,
338 
339  bbstreamer_finalize(mystreamer->base.bbs_next);
340 }
341 
342 /*
343  * Free memory.
344  */
345 static void
346 bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
347 {
348  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
349 
350  bbstreamer_free(streamer->bbs_next);
351  ZSTD_freeDCtx(mystreamer->dctx);
352  pfree(streamer->bbs_buffer.data);
353  pfree(streamer);
354 }
355 #endif
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:126
static void bbstreamer_finalize(bbstreamer *streamer)
Definition: bbstreamer.h:136
bbstreamer_archive_context
Definition: bbstreamer.h:54
@ BBSTREAMER_UNKNOWN
Definition: bbstreamer.h:55
static void bbstreamer_free(bbstreamer *streamer)
Definition: bbstreamer.h:144
bbstreamer * bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_zstd_decompressor_new(bbstreamer *next)
static int32 next
Definition: blutils.c:219
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:25
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1436
void * palloc0(Size size)
Definition: mcxt.c:1241
#define pg_fatal(...)
const void size_t len
const void * data
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:283
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:117
StringInfoData bbs_buffer
Definition: bbstreamer.h:102
bbstreamer * bbs_next
Definition: bbstreamer.h:101