PostgreSQL Source Code  git master
bbstreamer_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * bbstreamer_zstd.c
4  *
5  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/bbstreamer_zstd.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <unistd.h>
15 
16 #ifdef USE_ZSTD
17 #include <zstd.h>
18 #endif
19 
20 #include "bbstreamer.h"
21 #include "common/logging.h"
22 
23 #ifdef USE_ZSTD
24 
25 typedef struct bbstreamer_zstd_frame
26 {
27  bbstreamer base;
28 
29  ZSTD_CCtx *cctx;
30  ZSTD_DCtx *dctx;
31  ZSTD_outBuffer zstd_outBuf;
32 } bbstreamer_zstd_frame;
33 
34 static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
35  bbstreamer_member *member,
36  const char *data, int len,
38 static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
39 static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
40 
41 const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
42  .content = bbstreamer_zstd_compressor_content,
43  .finalize = bbstreamer_zstd_compressor_finalize,
44  .free = bbstreamer_zstd_compressor_free
45 };
46 
47 static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
48  bbstreamer_member *member,
49  const char *data, int len,
51 static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
52 static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
53 
54 const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
55  .content = bbstreamer_zstd_decompressor_content,
56  .finalize = bbstreamer_zstd_decompressor_finalize,
57  .free = bbstreamer_zstd_decompressor_free
58 };
59 #endif
60 
61 /*
62  * Create a new base backup streamer that performs zstd compression of tar
63  * blocks.
64  */
65 bbstreamer *
67 {
68 #ifdef USE_ZSTD
69  bbstreamer_zstd_frame *streamer;
70  size_t ret;
71 
72  Assert(next != NULL);
73 
74  streamer = palloc0(sizeof(bbstreamer_zstd_frame));
75 
76  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
77  &bbstreamer_zstd_compressor_ops;
78 
79  streamer->base.bbs_next = next;
80  initStringInfo(&streamer->base.bbs_buffer);
81  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
82 
83  streamer->cctx = ZSTD_createCCtx();
84  if (!streamer->cctx)
85  pg_fatal("could not create zstd compression context");
86 
87  /* Set compression level */
88  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
89  compress->level);
90  if (ZSTD_isError(ret))
91  pg_fatal("could not set zstd compression level to %d: %s",
92  compress->level, ZSTD_getErrorName(ret));
93 
94  /* Set # of workers, if specified */
95  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
96  {
97  /*
98  * On older versions of libzstd, this option does not exist, and
99  * trying to set it will fail. Similarly for newer versions if they
100  * are compiled without threading support.
101  */
102  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
103  compress->workers);
104  if (ZSTD_isError(ret))
105  pg_fatal("could not set compression worker count to %d: %s",
106  compress->workers, ZSTD_getErrorName(ret));
107  }
108 
109  if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
110  {
111  ret = ZSTD_CCtx_setParameter(streamer->cctx,
112  ZSTD_c_enableLongDistanceMatching,
113  compress->long_distance);
114  if (ZSTD_isError(ret))
115  {
116  pg_log_error("could not enable long-distance mode: %s",
117  ZSTD_getErrorName(ret));
118  exit(1);
119  }
120  }
121 
122  /* Initialize the ZSTD output buffer. */
123  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
124  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
125  streamer->zstd_outBuf.pos = 0;
126 
127  return &streamer->base;
128 #else
129  pg_fatal("this build does not support compression with %s", "ZSTD");
130  return NULL; /* keep compiler quiet */
131 #endif
132 }
133 
134 #ifdef USE_ZSTD
135 /*
136  * Compress the input data to output buffer.
137  *
138  * Find out the compression bound based on input data length for each
139  * invocation to make sure that output buffer has enough capacity to
140  * accommodate the compressed data. In case if the output buffer
141  * capacity falls short of compression bound then forward the content
142  * of output buffer to next streamer and empty the buffer.
143  */
144 static void
145 bbstreamer_zstd_compressor_content(bbstreamer *streamer,
146  bbstreamer_member *member,
147  const char *data, int len,
149 {
150  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
151  ZSTD_inBuffer inBuf = {data, len, 0};
152 
153  while (inBuf.pos < inBuf.size)
154  {
155  size_t yet_to_flush;
156  size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
157 
158  /*
159  * If the output buffer is not left with enough space, send the
160  * compressed bytes to the next streamer, and empty the buffer.
161  */
162  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
163  max_needed)
164  {
165  bbstreamer_content(mystreamer->base.bbs_next, member,
166  mystreamer->zstd_outBuf.dst,
167  mystreamer->zstd_outBuf.pos,
168  context);
169 
170  /* Reset the ZSTD output buffer. */
171  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
172  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
173  mystreamer->zstd_outBuf.pos = 0;
174  }
175 
176  yet_to_flush =
177  ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
178  &inBuf, ZSTD_e_continue);
179 
180  if (ZSTD_isError(yet_to_flush))
181  pg_log_error("could not compress data: %s",
182  ZSTD_getErrorName(yet_to_flush));
183  }
184 }
185 
186 /*
187  * End-of-stream processing.
188  */
189 static void
190 bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
191 {
192  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
193  size_t yet_to_flush;
194 
195  do
196  {
197  ZSTD_inBuffer in = {NULL, 0, 0};
198  size_t max_needed = ZSTD_compressBound(0);
199 
200  /*
201  * If the output buffer is not left with enough space, send the
202  * compressed bytes to the next streamer, and empty the buffer.
203  */
204  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
205  max_needed)
206  {
207  bbstreamer_content(mystreamer->base.bbs_next, NULL,
208  mystreamer->zstd_outBuf.dst,
209  mystreamer->zstd_outBuf.pos,
211 
212  /* Reset the ZSTD output buffer. */
213  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
214  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
215  mystreamer->zstd_outBuf.pos = 0;
216  }
217 
218  yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
219  &mystreamer->zstd_outBuf,
220  &in, ZSTD_e_end);
221 
222  if (ZSTD_isError(yet_to_flush))
223  pg_log_error("could not compress data: %s",
224  ZSTD_getErrorName(yet_to_flush));
225 
226  } while (yet_to_flush > 0);
227 
228  /* Make sure to pass any remaining bytes to the next streamer. */
229  if (mystreamer->zstd_outBuf.pos > 0)
230  bbstreamer_content(mystreamer->base.bbs_next, NULL,
231  mystreamer->zstd_outBuf.dst,
232  mystreamer->zstd_outBuf.pos,
234 
235  bbstreamer_finalize(mystreamer->base.bbs_next);
236 }
237 
238 /*
239  * Free memory.
240  */
241 static void
242 bbstreamer_zstd_compressor_free(bbstreamer *streamer)
243 {
244  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
245 
246  bbstreamer_free(streamer->bbs_next);
247  ZSTD_freeCCtx(mystreamer->cctx);
248  pfree(streamer->bbs_buffer.data);
249  pfree(streamer);
250 }
251 #endif
252 
253 /*
254  * Create a new base backup streamer that performs decompression of zstd
255  * compressed blocks.
256  */
257 bbstreamer *
259 {
260 #ifdef USE_ZSTD
261  bbstreamer_zstd_frame *streamer;
262 
263  Assert(next != NULL);
264 
265  streamer = palloc0(sizeof(bbstreamer_zstd_frame));
266  *((const bbstreamer_ops **) &streamer->base.bbs_ops) =
267  &bbstreamer_zstd_decompressor_ops;
268 
269  streamer->base.bbs_next = next;
270  initStringInfo(&streamer->base.bbs_buffer);
271  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
272 
273  streamer->dctx = ZSTD_createDCtx();
274  if (!streamer->dctx)
275  pg_fatal("could not create zstd decompression context");
276 
277  /* Initialize the ZSTD output buffer. */
278  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
279  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
280  streamer->zstd_outBuf.pos = 0;
281 
282  return &streamer->base;
283 #else
284  pg_fatal("this build does not support compression with %s", "ZSTD");
285  return NULL; /* keep compiler quiet */
286 #endif
287 }
288 
289 #ifdef USE_ZSTD
290 /*
291  * Decompress the input data to output buffer until we run out of input
292  * data. Each time the output buffer is full, pass on the decompressed data
293  * to the next streamer.
294  */
295 static void
296 bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
297  bbstreamer_member *member,
298  const char *data, int len,
300 {
301  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
302  ZSTD_inBuffer inBuf = {data, len, 0};
303 
304  while (inBuf.pos < inBuf.size)
305  {
306  size_t ret;
307 
308  /*
309  * If output buffer is full then forward the content to next streamer
310  * and update the output buffer.
311  */
312  if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
313  {
314  bbstreamer_content(mystreamer->base.bbs_next, member,
315  mystreamer->zstd_outBuf.dst,
316  mystreamer->zstd_outBuf.pos,
317  context);
318 
319  /* Reset the ZSTD output buffer. */
320  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
321  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
322  mystreamer->zstd_outBuf.pos = 0;
323  }
324 
325  ret = ZSTD_decompressStream(mystreamer->dctx,
326  &mystreamer->zstd_outBuf, &inBuf);
327 
328  if (ZSTD_isError(ret))
329  pg_log_error("could not decompress data: %s",
330  ZSTD_getErrorName(ret));
331  }
332 }
333 
334 /*
335  * End-of-stream processing.
336  */
337 static void
338 bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
339 {
340  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
341 
342  /*
343  * End of the stream, if there is some pending data in output buffers then
344  * we must forward it to next streamer.
345  */
346  if (mystreamer->zstd_outBuf.pos > 0)
347  bbstreamer_content(mystreamer->base.bbs_next, NULL,
348  mystreamer->base.bbs_buffer.data,
349  mystreamer->base.bbs_buffer.maxlen,
351 
352  bbstreamer_finalize(mystreamer->base.bbs_next);
353 }
354 
355 /*
356  * Free memory.
357  */
358 static void
359 bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
360 {
361  bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
362 
363  bbstreamer_free(streamer->bbs_next);
364  ZSTD_freeDCtx(mystreamer->dctx);
365  pfree(streamer->bbs_buffer.data);
366  pfree(streamer);
367 }
368 #endif
static void bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:126
static void bbstreamer_finalize(bbstreamer *streamer)
Definition: bbstreamer.h:136
bbstreamer_archive_context
Definition: bbstreamer.h:54
@ BBSTREAMER_UNKNOWN
Definition: bbstreamer.h:55
static void bbstreamer_free(bbstreamer *streamer)
Definition: bbstreamer.h:144
bbstreamer * bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
bbstreamer * bbstreamer_zstd_decompressor_new(bbstreamer *next)
static int32 next
Definition: blutils.c:221
#define Assert(condition)
Definition: c.h:858
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1520
void * palloc0(Size size)
Definition: mcxt.c:1346
#define pg_fatal(...)
const void size_t len
const void * data
tree context
Definition: radixtree.h:1833
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:289
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(bbstreamer *streamer, bbstreamer_member *member, const char *data, int len, bbstreamer_archive_context context)
Definition: bbstreamer.h:117
StringInfoData bbs_buffer
Definition: bbstreamer.h:102
bbstreamer * bbs_next
Definition: bbstreamer.h:101