PostgreSQL Source Code  git master
basebackup_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_zstd.c
4  * Basebackup sink implementing zstd compression.
5  *
6  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/backup/basebackup_zstd.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #ifdef USE_ZSTD
16 #include <zstd.h>
17 #endif
18 
19 #include "backup/basebackup_sink.h"
20 
21 #ifdef USE_ZSTD
22 
23 typedef struct bbsink_zstd
24 {
25  /* Common information for all types of sink. */
26  bbsink base;
27 
28  /* Compression options */
29  pg_compress_specification *compress;
30 
31  ZSTD_CCtx *cctx;
32  ZSTD_outBuffer zstd_outBuf;
33 } bbsink_zstd;
34 
35 static void bbsink_zstd_begin_backup(bbsink *sink);
36 static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
37 static void bbsink_zstd_archive_contents(bbsink *sink, size_t len);
38 static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
39 static void bbsink_zstd_end_archive(bbsink *sink);
40 static void bbsink_zstd_cleanup(bbsink *sink);
41 static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
42  TimeLineID endtli);
43 
44 static const bbsink_ops bbsink_zstd_ops = {
45  .begin_backup = bbsink_zstd_begin_backup,
46  .begin_archive = bbsink_zstd_begin_archive,
47  .archive_contents = bbsink_zstd_archive_contents,
48  .end_archive = bbsink_zstd_end_archive,
49  .begin_manifest = bbsink_forward_begin_manifest,
50  .manifest_contents = bbsink_zstd_manifest_contents,
51  .end_manifest = bbsink_forward_end_manifest,
52  .end_backup = bbsink_zstd_end_backup,
53  .cleanup = bbsink_zstd_cleanup
54 };
55 #endif
56 
57 /*
58  * Create a new basebackup sink that performs zstd compression.
59  */
60 bbsink *
62 {
63 #ifndef USE_ZSTD
64  ereport(ERROR,
65  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66  errmsg("zstd compression is not supported by this build")));
67  return NULL; /* keep compiler quiet */
68 #else
69  bbsink_zstd *sink;
70 
71  Assert(next != NULL);
72 
73  sink = palloc0(sizeof(bbsink_zstd));
74  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75  sink->base.bbs_next = next;
76  sink->compress = compress;
77 
78  return &sink->base;
79 #endif
80 }
81 
82 #ifdef USE_ZSTD
83 
84 /*
85  * Begin backup.
86  */
87 static void
88 bbsink_zstd_begin_backup(bbsink *sink)
89 {
90  bbsink_zstd *mysink = (bbsink_zstd *) sink;
91  size_t output_buffer_bound;
92  size_t ret;
93  pg_compress_specification *compress = mysink->compress;
94 
95  mysink->cctx = ZSTD_createCCtx();
96  if (!mysink->cctx)
97  elog(ERROR, "could not create zstd compression context");
98 
99  ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
100  compress->level);
101  if (ZSTD_isError(ret))
102  elog(ERROR, "could not set zstd compression level to %d: %s",
103  compress->level, ZSTD_getErrorName(ret));
104 
105  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
106  {
107  /*
108  * On older versions of libzstd, this option does not exist, and
109  * trying to set it will fail. Similarly for newer versions if they
110  * are compiled without threading support.
111  */
112  ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
113  compress->workers);
114  if (ZSTD_isError(ret))
115  ereport(ERROR,
116  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117  errmsg("could not set compression worker count to %d: %s",
118  compress->workers, ZSTD_getErrorName(ret)));
119  }
120 
121  if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
122  {
123  ret = ZSTD_CCtx_setParameter(mysink->cctx,
124  ZSTD_c_enableLongDistanceMatching,
125  compress->long_distance);
126  if (ZSTD_isError(ret))
127  ereport(ERROR,
128  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129  errmsg("could not enable long-distance mode: %s",
130  ZSTD_getErrorName(ret)));
131  }
132 
133  /*
134  * We need our own buffer, because we're going to pass different data to
135  * the next sink than what gets passed to us.
136  */
137  mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
138 
139  /*
140  * Make sure that the next sink's bbs_buffer is big enough to accommodate
141  * the compressed input buffer.
142  */
143  output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
144 
145  /*
146  * The buffer length is expected to be a multiple of BLCKSZ, so round up.
147  */
148  output_buffer_bound = output_buffer_bound + BLCKSZ -
149  (output_buffer_bound % BLCKSZ);
150 
151  bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
152 }
153 
154 /*
155  * Prepare to compress the next archive.
156  */
157 static void
158 bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
159 {
160  bbsink_zstd *mysink = (bbsink_zstd *) sink;
161  char *zstd_archive_name;
162 
163  /*
164  * At the start of each archive we reset the state to start a new
165  * compression operation. The parameters are sticky and they will stick
166  * around as we are resetting with option ZSTD_reset_session_only.
167  */
168  ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
169 
170  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
171  mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
172  mysink->zstd_outBuf.pos = 0;
173 
174  /* Add ".zst" to the archive name. */
175  zstd_archive_name = psprintf("%s.zst", archive_name);
176  Assert(sink->bbs_next != NULL);
177  bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
178  pfree(zstd_archive_name);
179 }
180 
181 /*
182  * Compress the input data to the output buffer until we run out of input
183  * data. Each time the output buffer falls below the compression bound for
184  * the input buffer, invoke the archive_contents() method for the next sink.
185  *
186  * Note that since we're compressing the input, it may very commonly happen
187  * that we consume all the input data without filling the output buffer. In
188  * that case, the compressed representation of the current input data won't
189  * actually be sent to the next bbsink until a later call to this function,
190  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
191  */
192 static void
193 bbsink_zstd_archive_contents(bbsink *sink, size_t len)
194 {
195  bbsink_zstd *mysink = (bbsink_zstd *) sink;
196  ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
197 
198  while (inBuf.pos < inBuf.size)
199  {
200  size_t yet_to_flush;
201  size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
202 
203  /*
204  * If the out buffer is not left with enough space, send the output
205  * buffer to the next sink, and reset it.
206  */
207  if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
208  {
209  bbsink_archive_contents(mysink->base.bbs_next,
210  mysink->zstd_outBuf.pos);
211  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
212  mysink->zstd_outBuf.size =
213  mysink->base.bbs_next->bbs_buffer_length;
214  mysink->zstd_outBuf.pos = 0;
215  }
216 
217  yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
218  &inBuf, ZSTD_e_continue);
219 
220  if (ZSTD_isError(yet_to_flush))
221  elog(ERROR,
222  "could not compress data: %s",
223  ZSTD_getErrorName(yet_to_flush));
224  }
225 }
226 
227 /*
228  * There might be some data inside zstd's internal buffers; we need to get that
229  * flushed out, also end the zstd frame and then get that forwarded to the
230  * successor sink as archive content.
231  *
232  * Then we can end processing for this archive.
233  */
234 static void
235 bbsink_zstd_end_archive(bbsink *sink)
236 {
237  bbsink_zstd *mysink = (bbsink_zstd *) sink;
238  size_t yet_to_flush;
239 
240  do
241  {
242  ZSTD_inBuffer in = {NULL, 0, 0};
243  size_t max_needed = ZSTD_compressBound(0);
244 
245  /*
246  * If the out buffer is not left with enough space, send the output
247  * buffer to the next sink, and reset it.
248  */
249  if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
250  {
251  bbsink_archive_contents(mysink->base.bbs_next,
252  mysink->zstd_outBuf.pos);
253  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
254  mysink->zstd_outBuf.size =
255  mysink->base.bbs_next->bbs_buffer_length;
256  mysink->zstd_outBuf.pos = 0;
257  }
258 
259  yet_to_flush = ZSTD_compressStream2(mysink->cctx,
260  &mysink->zstd_outBuf,
261  &in, ZSTD_e_end);
262 
263  if (ZSTD_isError(yet_to_flush))
264  elog(ERROR, "could not compress data: %s",
265  ZSTD_getErrorName(yet_to_flush));
266 
267  } while (yet_to_flush > 0);
268 
269  /* Make sure to pass any remaining bytes to the next sink. */
270  if (mysink->zstd_outBuf.pos > 0)
271  bbsink_archive_contents(mysink->base.bbs_next,
272  mysink->zstd_outBuf.pos);
273 
274  /* Pass on the information that this archive has ended. */
276 }
277 
278 /*
279  * Free the resources and context.
280  */
281 static void
282 bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
283  TimeLineID endtli)
284 {
285  bbsink_zstd *mysink = (bbsink_zstd *) sink;
286 
287  /* Release the context. */
288  if (mysink->cctx)
289  {
290  ZSTD_freeCCtx(mysink->cctx);
291  mysink->cctx = NULL;
292  }
293 
294  bbsink_forward_end_backup(sink, endptr, endtli);
295 }
296 
297 /*
298  * Manifest contents are not compressed, but we do need to copy them into
299  * the successor sink's buffer, because we have our own.
300  */
301 static void
302 bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
303 {
304  memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
306 }
307 
308 /*
309  * In case the backup fails, make sure we free any compression context that
310  * got allocated, so that we don't leak memory.
311  */
312 static void
313 bbsink_zstd_cleanup(bbsink *sink)
314 {
315  bbsink_zstd *mysink = (bbsink_zstd *) sink;
316 
317  /* Release the context if not already released. */
318  if (mysink->cctx)
319  {
320  ZSTD_freeCCtx(mysink->cctx);
321  mysink->cctx = NULL;
322  }
323 }
324 
325 #endif
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
bbsink * bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
static int32 next
Definition: blutils.c:222
#define Assert(condition)
Definition: c.h:858
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
const void size_t len
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59