PostgreSQL Source Code  git master
basebackup_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_zstd.c
4  * Basebackup sink implementing zstd compression.
5  *
6  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/backup/basebackup_zstd.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #ifdef USE_ZSTD
16 #include <zstd.h>
17 #endif
18 
19 #include "backup/basebackup_sink.h"
20 
21 #ifdef USE_ZSTD
22 
23 typedef struct bbsink_zstd
24 {
25  /* Common information for all types of sink. */
26  bbsink base;
27 
28  /* Compression options */
29  pg_compress_specification *compress;
30 
31  ZSTD_CCtx *cctx;
32  ZSTD_outBuffer zstd_outBuf;
33 } bbsink_zstd;
34 
35 static void bbsink_zstd_begin_backup(bbsink *sink);
36 static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
37 static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
38 static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
39 static void bbsink_zstd_end_archive(bbsink *sink);
40 static void bbsink_zstd_cleanup(bbsink *sink);
41 static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
42  TimeLineID endtli);
43 
44 static const bbsink_ops bbsink_zstd_ops = {
45  .begin_backup = bbsink_zstd_begin_backup,
46  .begin_archive = bbsink_zstd_begin_archive,
47  .archive_contents = bbsink_zstd_archive_contents,
48  .end_archive = bbsink_zstd_end_archive,
49  .begin_manifest = bbsink_forward_begin_manifest,
50  .manifest_contents = bbsink_zstd_manifest_contents,
51  .end_manifest = bbsink_forward_end_manifest,
52  .end_backup = bbsink_zstd_end_backup,
53  .cleanup = bbsink_zstd_cleanup
54 };
55 #endif
56 
57 /*
58  * Create a new basebackup sink that performs zstd compression.
59  */
60 bbsink *
62 {
63 #ifndef USE_ZSTD
64  ereport(ERROR,
65  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66  errmsg("zstd compression is not supported by this build")));
67  return NULL; /* keep compiler quiet */
68 #else
69  bbsink_zstd *sink;
70 
71  Assert(next != NULL);
72 
73  sink = palloc0(sizeof(bbsink_zstd));
74  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75  sink->base.bbs_next = next;
76  sink->compress = compress;
77 
78  return &sink->base;
79 #endif
80 }
81 
82 #ifdef USE_ZSTD
83 
84 /*
85  * Begin backup.
86  */
87 static void
88 bbsink_zstd_begin_backup(bbsink *sink)
89 {
90  bbsink_zstd *mysink = (bbsink_zstd *) sink;
91  size_t output_buffer_bound;
92  size_t ret;
93  pg_compress_specification *compress = mysink->compress;
94 
95  mysink->cctx = ZSTD_createCCtx();
96  if (!mysink->cctx)
97  elog(ERROR, "could not create zstd compression context");
98 
99  ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
100  compress->level);
101  if (ZSTD_isError(ret))
102  elog(ERROR, "could not set zstd compression level to %d: %s",
103  compress->level, ZSTD_getErrorName(ret));
104 
105  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
106  {
107  /*
108  * On older versions of libzstd, this option does not exist, and
109  * trying to set it will fail. Similarly for newer versions if they
110  * are compiled without threading support.
111  */
112  ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
113  compress->workers);
114  if (ZSTD_isError(ret))
115  ereport(ERROR,
116  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117  errmsg("could not set compression worker count to %d: %s",
118  compress->workers, ZSTD_getErrorName(ret)));
119  }
120 
121  /*
122  * We need our own buffer, because we're going to pass different data to
123  * the next sink than what gets passed to us.
124  */
125  mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
126 
127  /*
128  * Make sure that the next sink's bbs_buffer is big enough to accommodate
129  * the compressed input buffer.
130  */
131  output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
132 
133  /*
134  * The buffer length is expected to be a multiple of BLCKSZ, so round up.
135  */
136  output_buffer_bound = output_buffer_bound + BLCKSZ -
137  (output_buffer_bound % BLCKSZ);
138 
139  bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
140 }
141 
142 /*
143  * Prepare to compress the next archive.
144  */
145 static void
146 bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
147 {
148  bbsink_zstd *mysink = (bbsink_zstd *) sink;
149  char *zstd_archive_name;
150 
151  /*
152  * At the start of each archive we reset the state to start a new
153  * compression operation. The parameters are sticky and they will stick
154  * around as we are resetting with option ZSTD_reset_session_only.
155  */
156  ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
157 
158  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
159  mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
160  mysink->zstd_outBuf.pos = 0;
161 
162  /* Add ".zst" to the archive name. */
163  zstd_archive_name = psprintf("%s.zst", archive_name);
164  Assert(sink->bbs_next != NULL);
165  bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
166  pfree(zstd_archive_name);
167 }
168 
169 /*
170  * Compress the input data to the output buffer until we run out of input
171  * data. Each time the output buffer falls below the compression bound for
172  * the input buffer, invoke the archive_contents() method for the next sink.
173  *
174  * Note that since we're compressing the input, it may very commonly happen
175  * that we consume all the input data without filling the output buffer. In
176  * that case, the compressed representation of the current input data won't
177  * actually be sent to the next bbsink until a later call to this function,
178  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
179  */
180 static void
181 bbsink_zstd_archive_contents(bbsink *sink, size_t len)
182 {
183  bbsink_zstd *mysink = (bbsink_zstd *) sink;
184  ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
185 
186  while (inBuf.pos < inBuf.size)
187  {
188  size_t yet_to_flush;
189  size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
190 
191  /*
192  * If the out buffer is not left with enough space, send the output
193  * buffer to the next sink, and reset it.
194  */
195  if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
196  {
197  bbsink_archive_contents(mysink->base.bbs_next,
198  mysink->zstd_outBuf.pos);
199  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
200  mysink->zstd_outBuf.size =
201  mysink->base.bbs_next->bbs_buffer_length;
202  mysink->zstd_outBuf.pos = 0;
203  }
204 
205  yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
206  &inBuf, ZSTD_e_continue);
207 
208  if (ZSTD_isError(yet_to_flush))
209  elog(ERROR,
210  "could not compress data: %s",
211  ZSTD_getErrorName(yet_to_flush));
212  }
213 }
214 
215 /*
216  * There might be some data inside zstd's internal buffers; we need to get that
217  * flushed out, also end the zstd frame and then get that forwarded to the
218  * successor sink as archive content.
219  *
220  * Then we can end processing for this archive.
221  */
222 static void
223 bbsink_zstd_end_archive(bbsink *sink)
224 {
225  bbsink_zstd *mysink = (bbsink_zstd *) sink;
226  size_t yet_to_flush;
227 
228  do
229  {
230  ZSTD_inBuffer in = {NULL, 0, 0};
231  size_t max_needed = ZSTD_compressBound(0);
232 
233  /*
234  * If the out buffer is not left with enough space, send the output
235  * buffer to the next sink, and reset it.
236  */
237  if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
238  {
239  bbsink_archive_contents(mysink->base.bbs_next,
240  mysink->zstd_outBuf.pos);
241  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
242  mysink->zstd_outBuf.size =
243  mysink->base.bbs_next->bbs_buffer_length;
244  mysink->zstd_outBuf.pos = 0;
245  }
246 
247  yet_to_flush = ZSTD_compressStream2(mysink->cctx,
248  &mysink->zstd_outBuf,
249  &in, ZSTD_e_end);
250 
251  if (ZSTD_isError(yet_to_flush))
252  elog(ERROR, "could not compress data: %s",
253  ZSTD_getErrorName(yet_to_flush));
254 
255  } while (yet_to_flush > 0);
256 
257  /* Make sure to pass any remaining bytes to the next sink. */
258  if (mysink->zstd_outBuf.pos > 0)
259  bbsink_archive_contents(mysink->base.bbs_next,
260  mysink->zstd_outBuf.pos);
261 
262  /* Pass on the information that this archive has ended. */
264 }
265 
266 /*
267  * Free the resources and context.
268  */
269 static void
270 bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
271  TimeLineID endtli)
272 {
273  bbsink_zstd *mysink = (bbsink_zstd *) sink;
274 
275  /* Release the context. */
276  if (mysink->cctx)
277  {
278  ZSTD_freeCCtx(mysink->cctx);
279  mysink->cctx = NULL;
280  }
281 
282  bbsink_forward_end_backup(sink, endptr, endtli);
283 }
284 
285 /*
286  * Manifest contents are not compressed, but we do need to copy them into
287  * the successor sink's buffer, because we have our own.
288  */
289 static void
290 bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
291 {
292  memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
294 }
295 
296 /*
297  * In case the backup fails, make sure we free any compression context that
298  * got allocated, so that we don't leak memory.
299  */
300 static void
301 bbsink_zstd_cleanup(bbsink *sink)
302 {
303  bbsink_zstd *mysink = (bbsink_zstd *) sink;
304 
305  /* Release the context if not already released. */
306  if (mysink->cctx)
307  {
308  ZSTD_freeCCtx(mysink->cctx);
309  mysink->cctx = NULL;
310  }
311 }
312 
313 #endif
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
bbsink * bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
static int32 next
Definition: blutils.c:219
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:25
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1436
void * palloc0(Size size)
Definition: mcxt.c:1241
void * palloc(Size size)
Definition: mcxt.c:1210
const void size_t len
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59