PostgreSQL Source Code  git master
astreamer_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * astreamer_zstd.c
4  *
5  * Archive streamers that deal with data compressed using zstd.
6  * astreamer_zstd_compressor applies lz4 compression to the input stream,
7  * and astreamer_zstd_decompressor does the reverse.
8  *
9  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
10  *
11  * IDENTIFICATION
12  * src/bin/pg_basebackup/astreamer_zstd.c
13  *-------------------------------------------------------------------------
14  */
15 
16 #include "postgres_fe.h"
17 
18 #include <unistd.h>
19 
20 #ifdef USE_ZSTD
21 #include <zstd.h>
22 #endif
23 
24 #include "common/logging.h"
25 #include "fe_utils/astreamer.h"
26 
27 #ifdef USE_ZSTD
28 
29 typedef struct astreamer_zstd_frame
30 {
31  astreamer base;
32 
33  ZSTD_CCtx *cctx;
34  ZSTD_DCtx *dctx;
35  ZSTD_outBuffer zstd_outBuf;
36 } astreamer_zstd_frame;
37 
38 static void astreamer_zstd_compressor_content(astreamer *streamer,
39  astreamer_member *member,
40  const char *data, int len,
42 static void astreamer_zstd_compressor_finalize(astreamer *streamer);
43 static void astreamer_zstd_compressor_free(astreamer *streamer);
44 
45 static const astreamer_ops astreamer_zstd_compressor_ops = {
46  .content = astreamer_zstd_compressor_content,
47  .finalize = astreamer_zstd_compressor_finalize,
48  .free = astreamer_zstd_compressor_free
49 };
50 
51 static void astreamer_zstd_decompressor_content(astreamer *streamer,
52  astreamer_member *member,
53  const char *data, int len,
55 static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
56 static void astreamer_zstd_decompressor_free(astreamer *streamer);
57 
58 static const astreamer_ops astreamer_zstd_decompressor_ops = {
59  .content = astreamer_zstd_decompressor_content,
60  .finalize = astreamer_zstd_decompressor_finalize,
61  .free = astreamer_zstd_decompressor_free
62 };
63 #endif
64 
65 /*
66  * Create a new base backup streamer that performs zstd compression of tar
67  * blocks.
68  */
69 astreamer *
71 {
72 #ifdef USE_ZSTD
73  astreamer_zstd_frame *streamer;
74  size_t ret;
75 
76  Assert(next != NULL);
77 
78  streamer = palloc0(sizeof(astreamer_zstd_frame));
79 
80  *((const astreamer_ops **) &streamer->base.bbs_ops) =
81  &astreamer_zstd_compressor_ops;
82 
83  streamer->base.bbs_next = next;
84  initStringInfo(&streamer->base.bbs_buffer);
85  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
86 
87  streamer->cctx = ZSTD_createCCtx();
88  if (!streamer->cctx)
89  pg_fatal("could not create zstd compression context");
90 
91  /* Set compression level */
92  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
93  compress->level);
94  if (ZSTD_isError(ret))
95  pg_fatal("could not set zstd compression level to %d: %s",
96  compress->level, ZSTD_getErrorName(ret));
97 
98  /* Set # of workers, if specified */
99  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100  {
101  /*
102  * On older versions of libzstd, this option does not exist, and
103  * trying to set it will fail. Similarly for newer versions if they
104  * are compiled without threading support.
105  */
106  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107  compress->workers);
108  if (ZSTD_isError(ret))
109  pg_fatal("could not set compression worker count to %d: %s",
110  compress->workers, ZSTD_getErrorName(ret));
111  }
112 
113  if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114  {
115  ret = ZSTD_CCtx_setParameter(streamer->cctx,
116  ZSTD_c_enableLongDistanceMatching,
117  compress->long_distance);
118  if (ZSTD_isError(ret))
119  {
120  pg_log_error("could not enable long-distance mode: %s",
121  ZSTD_getErrorName(ret));
122  exit(1);
123  }
124  }
125 
126  /* Initialize the ZSTD output buffer. */
127  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
128  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
129  streamer->zstd_outBuf.pos = 0;
130 
131  return &streamer->base;
132 #else
133  pg_fatal("this build does not support compression with %s", "ZSTD");
134  return NULL; /* keep compiler quiet */
135 #endif
136 }
137 
138 #ifdef USE_ZSTD
139 /*
140  * Compress the input data to output buffer.
141  *
142  * Find out the compression bound based on input data length for each
143  * invocation to make sure that output buffer has enough capacity to
144  * accommodate the compressed data. In case if the output buffer
145  * capacity falls short of compression bound then forward the content
146  * of output buffer to next streamer and empty the buffer.
147  */
148 static void
149 astreamer_zstd_compressor_content(astreamer *streamer,
150  astreamer_member *member,
151  const char *data, int len,
153 {
154  astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
155  ZSTD_inBuffer inBuf = {data, len, 0};
156 
157  while (inBuf.pos < inBuf.size)
158  {
159  size_t yet_to_flush;
160  size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
161 
162  /*
163  * If the output buffer is not left with enough space, send the
164  * compressed bytes to the next streamer, and empty the buffer.
165  */
166  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
167  max_needed)
168  {
169  astreamer_content(mystreamer->base.bbs_next, member,
170  mystreamer->zstd_outBuf.dst,
171  mystreamer->zstd_outBuf.pos,
172  context);
173 
174  /* Reset the ZSTD output buffer. */
175  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
176  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
177  mystreamer->zstd_outBuf.pos = 0;
178  }
179 
180  yet_to_flush =
181  ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
182  &inBuf, ZSTD_e_continue);
183 
184  if (ZSTD_isError(yet_to_flush))
185  pg_log_error("could not compress data: %s",
186  ZSTD_getErrorName(yet_to_flush));
187  }
188 }
189 
190 /*
191  * End-of-stream processing.
192  */
193 static void
194 astreamer_zstd_compressor_finalize(astreamer *streamer)
195 {
196  astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
197  size_t yet_to_flush;
198 
199  do
200  {
201  ZSTD_inBuffer in = {NULL, 0, 0};
202  size_t max_needed = ZSTD_compressBound(0);
203 
204  /*
205  * If the output buffer is not left with enough space, send the
206  * compressed bytes to the next streamer, and empty the buffer.
207  */
208  if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
209  max_needed)
210  {
211  astreamer_content(mystreamer->base.bbs_next, NULL,
212  mystreamer->zstd_outBuf.dst,
213  mystreamer->zstd_outBuf.pos,
215 
216  /* Reset the ZSTD output buffer. */
217  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
218  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
219  mystreamer->zstd_outBuf.pos = 0;
220  }
221 
222  yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
223  &mystreamer->zstd_outBuf,
224  &in, ZSTD_e_end);
225 
226  if (ZSTD_isError(yet_to_flush))
227  pg_log_error("could not compress data: %s",
228  ZSTD_getErrorName(yet_to_flush));
229 
230  } while (yet_to_flush > 0);
231 
232  /* Make sure to pass any remaining bytes to the next streamer. */
233  if (mystreamer->zstd_outBuf.pos > 0)
234  astreamer_content(mystreamer->base.bbs_next, NULL,
235  mystreamer->zstd_outBuf.dst,
236  mystreamer->zstd_outBuf.pos,
238 
239  astreamer_finalize(mystreamer->base.bbs_next);
240 }
241 
242 /*
243  * Free memory.
244  */
245 static void
246 astreamer_zstd_compressor_free(astreamer *streamer)
247 {
248  astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
249 
250  astreamer_free(streamer->bbs_next);
251  ZSTD_freeCCtx(mystreamer->cctx);
252  pfree(streamer->bbs_buffer.data);
253  pfree(streamer);
254 }
255 #endif
256 
257 /*
258  * Create a new base backup streamer that performs decompression of zstd
259  * compressed blocks.
260  */
261 astreamer *
263 {
264 #ifdef USE_ZSTD
265  astreamer_zstd_frame *streamer;
266 
267  Assert(next != NULL);
268 
269  streamer = palloc0(sizeof(astreamer_zstd_frame));
270  *((const astreamer_ops **) &streamer->base.bbs_ops) =
271  &astreamer_zstd_decompressor_ops;
272 
273  streamer->base.bbs_next = next;
274  initStringInfo(&streamer->base.bbs_buffer);
275  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
276 
277  streamer->dctx = ZSTD_createDCtx();
278  if (!streamer->dctx)
279  pg_fatal("could not create zstd decompression context");
280 
281  /* Initialize the ZSTD output buffer. */
282  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
283  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
284  streamer->zstd_outBuf.pos = 0;
285 
286  return &streamer->base;
287 #else
288  pg_fatal("this build does not support compression with %s", "ZSTD");
289  return NULL; /* keep compiler quiet */
290 #endif
291 }
292 
293 #ifdef USE_ZSTD
294 /*
295  * Decompress the input data to output buffer until we run out of input
296  * data. Each time the output buffer is full, pass on the decompressed data
297  * to the next streamer.
298  */
299 static void
300 astreamer_zstd_decompressor_content(astreamer *streamer,
301  astreamer_member *member,
302  const char *data, int len,
304 {
305  astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
306  ZSTD_inBuffer inBuf = {data, len, 0};
307 
308  while (inBuf.pos < inBuf.size)
309  {
310  size_t ret;
311 
312  /*
313  * If output buffer is full then forward the content to next streamer
314  * and update the output buffer.
315  */
316  if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
317  {
318  astreamer_content(mystreamer->base.bbs_next, member,
319  mystreamer->zstd_outBuf.dst,
320  mystreamer->zstd_outBuf.pos,
321  context);
322 
323  /* Reset the ZSTD output buffer. */
324  mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
325  mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
326  mystreamer->zstd_outBuf.pos = 0;
327  }
328 
329  ret = ZSTD_decompressStream(mystreamer->dctx,
330  &mystreamer->zstd_outBuf, &inBuf);
331 
332  if (ZSTD_isError(ret))
333  pg_log_error("could not decompress data: %s",
334  ZSTD_getErrorName(ret));
335  }
336 }
337 
338 /*
339  * End-of-stream processing.
340  */
341 static void
342 astreamer_zstd_decompressor_finalize(astreamer *streamer)
343 {
344  astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
345 
346  /*
347  * End of the stream, if there is some pending data in output buffers then
348  * we must forward it to next streamer.
349  */
350  if (mystreamer->zstd_outBuf.pos > 0)
351  astreamer_content(mystreamer->base.bbs_next, NULL,
352  mystreamer->base.bbs_buffer.data,
353  mystreamer->base.bbs_buffer.maxlen,
355 
356  astreamer_finalize(mystreamer->base.bbs_next);
357 }
358 
359 /*
360  * Free memory.
361  */
362 static void
363 astreamer_zstd_decompressor_free(astreamer *streamer)
364 {
365  astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
366 
367  astreamer_free(streamer->bbs_next);
368  ZSTD_freeDCtx(mystreamer->dctx);
369  pfree(streamer->bbs_buffer.data);
370  pfree(streamer);
371 }
372 #endif
static void astreamer_free(astreamer *streamer)
Definition: astreamer.h:153
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:135
static void astreamer_finalize(astreamer *streamer)
Definition: astreamer.h:145
astreamer_archive_context
Definition: astreamer.h:63
@ ASTREAMER_UNKNOWN
Definition: astreamer.h:64
astreamer * astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
astreamer * astreamer_zstd_decompressor_new(astreamer *next)
static int32 next
Definition: blutils.c:222
#define Assert(condition)
Definition: c.h:849
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
#define pg_fatal(...)
const void size_t len
const void * data
tree context
Definition: radixtree.h:1835
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:289
void initStringInfo(StringInfo str)
Definition: stringinfo.c:59
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:126
StringInfoData bbs_buffer
Definition: astreamer.h:111
astreamer * bbs_next
Definition: astreamer.h:110